1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.increment;
16  
17  import com.liferay.portal.kernel.concurrent.BatchablePipe;
18  import com.liferay.portal.kernel.log.Log;
19  import com.liferay.portal.kernel.log.LogFactoryUtil;
20  import com.liferay.portal.kernel.messaging.Message;
21  import com.liferay.portal.kernel.messaging.MessageRunnable;
22  
23  import java.util.concurrent.RejectedExecutionHandler;
24  import java.util.concurrent.ThreadPoolExecutor;
25  
26  /**
27   * <a href="BufferedIncrementDiscardPolicy.java.html"><b><i>View Source</i></b>
28   * </a>
29   *
30   * @author Shuyang Zhou
31   */
32  public class BufferedIncrementDiscardPolicy
33      implements RejectedExecutionHandler {
34  
35      @SuppressWarnings("rawtypes")
36      public void rejectedExecution(
37          Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
38  
39          MessageRunnable messageRunnable = (MessageRunnable)runnable;
40  
41          Message message = messageRunnable.getMessage();
42  
43          BatchablePipe<String, BufferedIncreasableEntry> batchablePipe =
44              (BatchablePipe<String, BufferedIncreasableEntry>)
45                  message.getPayload();
46  
47          for (int i = 0; i < _discardNumber; i++) {
48              BufferedIncreasableEntry bufferedIncreasableEntry =
49                  (BufferedIncreasableEntry)batchablePipe.take();
50  
51              if (bufferedIncreasableEntry == null) {
52                  break;
53              }
54              else if (_log.isInfoEnabled()) {
55                  _log.info(
56                      "Discarding BufferedIncreasableEntry " +
57                          bufferedIncreasableEntry);
58              }
59          }
60      }
61  
62      public void setDiscardNumber(int discardNumber) {
63          _discardNumber = discardNumber;
64      }
65  
66      private static Log _log = LogFactoryUtil.getLog(
67          BufferedIncrementDiscardPolicy.class);
68  
69      private int _discardNumber = 1;
70  
71  }