1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * This library is free software; you can redistribute it and/or modify it under
5    * the terms of the GNU Lesser General Public License as published by the Free
6    * Software Foundation; either version 2.1 of the License, or (at your option)
7    * any later version.
8    *
9    * This library is distributed in the hope that it will be useful, but WITHOUT
10   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11   * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12   * details.
13   */
14  
15  package com.liferay.portal.kernel.cache.cluster;
16  
17  import com.liferay.portal.kernel.concurrent.CoalescedPipe;
18  import com.liferay.portal.kernel.log.Log;
19  import com.liferay.portal.kernel.log.LogFactoryUtil;
20  
21  import java.util.concurrent.atomic.AtomicInteger;
22  import java.util.concurrent.atomic.AtomicLong;
23  
24  /**
25   * <a href="BasePortalCacheClusterChannel.java.html"><b><i>View Source</i></b>
26   * </a>
27   *
28   * @author Shuyang Zhou
29   */
30  public abstract class BasePortalCacheClusterChannel
31      extends Thread implements PortalCacheClusterChannel, Runnable {
32  
33      public BasePortalCacheClusterChannel() {
34          _dispatchThread = new Thread(
35              this,
36              "PortalCacheClusterChannel dispatch thread-" +
37                  _dispatchThreadCounter.getAndIncrement());
38          _eventQueue = new CoalescedPipe<PortalCacheClusterEvent>(
39              new PortalCacheClusterEventCoalesceComparator());
40      }
41  
42      public void destroy() {
43          _destroy = true;
44  
45          _dispatchThread.interrupt();
46      }
47  
48      public abstract void dispatchEvent(PortalCacheClusterEvent event);
49  
50      public long getCoalescedEventNumber() {
51          return _eventQueue.coalescedCount();
52      }
53  
54      public int getPendingEventNumber() {
55          return _eventQueue.pendingCount();
56      }
57  
58      public long getSentEventNumber() {
59          return _sentEventCounter.get();
60      }
61  
62      public void run() {
63          while (true) {
64              try {
65                  if (_destroy) {
66                      Object[] events = _eventQueue.takeSnapshot();
67  
68                      for (Object event : events) {
69                          dispatchEvent((PortalCacheClusterEvent)event);
70  
71                          _sentEventCounter.incrementAndGet();
72                      }
73  
74                      break;
75                  }
76                  else {
77                      try {
78                          PortalCacheClusterEvent portalCacheClusterEvent =
79                              _eventQueue.take();
80  
81                          dispatchEvent(portalCacheClusterEvent);
82  
83                          _sentEventCounter.incrementAndGet();
84                      }
85                      catch (InterruptedException ie) {
86                      }
87                  }
88              }
89              catch (Throwable t) {
90                  if (_log.isWarnEnabled()) {
91                      _log.warn("Please fix the unexpected throwable", t);
92                  }
93              }
94          }
95      }
96  
97      public void sendEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
98          if (_started == false) {
99              synchronized (this) {
100                 if (_started == false) {
101                     _dispatchThread.start();
102 
103                     _started = true;
104                 }
105             }
106         }
107 
108         try {
109             _eventQueue.put(portalCacheClusterEvent);
110         }
111         catch (InterruptedException ie) {
112         }
113     }
114 
115     private static Log _log = LogFactoryUtil.getLog(
116         BasePortalCacheClusterChannel.class);
117 
118     private static AtomicInteger _dispatchThreadCounter = new AtomicInteger(0);
119 
120     private volatile boolean _destroy = false;
121     private final Thread _dispatchThread;
122     private final CoalescedPipe<PortalCacheClusterEvent> _eventQueue;
123     private final AtomicLong _sentEventCounter = new AtomicLong(0);
124     private volatile boolean _started = false;
125 
126 }