1
14
15 package com.liferay.portal.increment;
16
17 import com.liferay.portal.kernel.concurrent.BatchablePipe;
18 import com.liferay.portal.kernel.log.Log;
19 import com.liferay.portal.kernel.log.LogFactoryUtil;
20 import com.liferay.portal.kernel.messaging.Message;
21 import com.liferay.portal.kernel.messaging.MessageRunnable;
22
23 import java.util.concurrent.RejectedExecutionHandler;
24 import java.util.concurrent.ThreadPoolExecutor;
25
26
32 public class BufferedIncrementDiscardPolicy
33 implements RejectedExecutionHandler {
34
35 @SuppressWarnings("rawtypes")
36 public void rejectedExecution(
37 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
38
39 MessageRunnable messageRunnable = (MessageRunnable)runnable;
40
41 Message message = messageRunnable.getMessage();
42
43 BatchablePipe<String, BufferedIncreasableEntry> batchablePipe =
44 (BatchablePipe<String, BufferedIncreasableEntry>)
45 message.getPayload();
46
47 for (int i = 0; i < _discardNumber; i++) {
48 BufferedIncreasableEntry bufferedIncreasableEntry =
49 (BufferedIncreasableEntry)batchablePipe.take();
50
51 if (bufferedIncreasableEntry == null) {
52 break;
53 }
54 else if (_log.isInfoEnabled()) {
55 _log.info(
56 "Discarding BufferedIncreasableEntry " +
57 bufferedIncreasableEntry);
58 }
59 }
60 }
61
62 public void setDiscardNumber(int discardNumber) {
63 _discardNumber = discardNumber;
64 }
65
66 private static Log _log = LogFactoryUtil.getLog(
67 BufferedIncrementDiscardPolicy.class);
68
69 private int _discardNumber = 1;
70
71 }