1
14
15 package com.liferay.portal.kernel.cache.cluster;
16
17 import com.liferay.portal.kernel.concurrent.CoalescedPipe;
18 import com.liferay.portal.kernel.log.Log;
19 import com.liferay.portal.kernel.log.LogFactoryUtil;
20
21 import java.util.concurrent.atomic.AtomicInteger;
22 import java.util.concurrent.atomic.AtomicLong;
23
24
30 public abstract class BasePortalCacheClusterChannel
31 extends Thread implements PortalCacheClusterChannel, Runnable {
32
33 public BasePortalCacheClusterChannel() {
34 _dispatchThread = new Thread(
35 this,
36 "PortalCacheClusterChannel dispatch thread-" +
37 _dispatchThreadCounter.getAndIncrement());
38 _eventQueue = new CoalescedPipe<PortalCacheClusterEvent>(
39 new PortalCacheClusterEventCoalesceComparator());
40 }
41
42 public void destroy() {
43 _destroy = true;
44
45 _dispatchThread.interrupt();
46 }
47
48 public abstract void dispatchEvent(PortalCacheClusterEvent event);
49
50 public long getCoalescedEventNumber() {
51 return _eventQueue.coalescedCount();
52 }
53
54 public int getPendingEventNumber() {
55 return _eventQueue.pendingCount();
56 }
57
58 public long getSentEventNumber() {
59 return _sentEventCounter.get();
60 }
61
62 public void run() {
63 while (true) {
64 try {
65 if (_destroy) {
66 Object[] events = _eventQueue.takeSnapshot();
67
68 for (Object event : events) {
69 dispatchEvent((PortalCacheClusterEvent)event);
70
71 _sentEventCounter.incrementAndGet();
72 }
73
74 break;
75 }
76 else {
77 try {
78 PortalCacheClusterEvent portalCacheClusterEvent =
79 _eventQueue.take();
80
81 dispatchEvent(portalCacheClusterEvent);
82
83 _sentEventCounter.incrementAndGet();
84 }
85 catch (InterruptedException ie) {
86 }
87 }
88 }
89 catch (Throwable t) {
90 if (_log.isWarnEnabled()) {
91 _log.warn("Please fix the unexpected throwable", t);
92 }
93 }
94 }
95 }
96
97 public void sendEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
98 if (_started == false) {
99 synchronized (this) {
100 if (_started == false) {
101 _dispatchThread.start();
102
103 _started = true;
104 }
105 }
106 }
107
108 try {
109 _eventQueue.put(portalCacheClusterEvent);
110 }
111 catch (InterruptedException ie) {
112 }
113 }
114
115 private static Log _log = LogFactoryUtil.getLog(
116 BasePortalCacheClusterChannel.class);
117
118 private static AtomicInteger _dispatchThreadCounter = new AtomicInteger(0);
119
120 private volatile boolean _destroy = false;
121 private final Thread _dispatchThread;
122 private final CoalescedPipe<PortalCacheClusterEvent> _eventQueue;
123 private final AtomicLong _sentEventCounter = new AtomicLong(0);
124 private volatile boolean _started = false;
125
126 }