1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ConcurrentHashSet;
28 import com.liferay.portal.kernel.util.NamedThreadFactory;
29
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.LinkedBlockingQueue;
33 import java.util.concurrent.ThreadPoolExecutor;
34 import java.util.concurrent.TimeUnit;
35
36
41 public abstract class BaseDestination implements Destination {
42
43 public BaseDestination(String name) {
44 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
45 }
46
47 public BaseDestination(
48 String name, int workersCoreSize, int workersMaxSize) {
49
50 _name = name;
51 _workersCoreSize = workersCoreSize;
52 _workersMaxSize = workersMaxSize;
53
54 open();
55 }
56
57 public void addDestinationEventListener(
58 DestinationEventListener destinationEventListener) {
59
60 _destinationEventListeners.add(destinationEventListener);
61 }
62
63 public synchronized void close() {
64 close(false);
65 }
66
67 public synchronized void close(boolean force) {
68 doClose(force);
69 }
70
71 public void copyDestinationEventListeners(Destination destination) {
72 for (DestinationEventListener destinationEventListener :
73 _destinationEventListeners) {
74
75 destination.addDestinationEventListener(
76 destinationEventListener);
77 }
78 }
79
80 public void copyMessageListeners(Destination destination) {
81 for (MessageListener messageListener : _messageListeners) {
82 InvokerMessageListener invokerMessageListener =
83 (InvokerMessageListener)messageListener;
84
85 destination.register(
86 invokerMessageListener.getMessageListener(),
87 invokerMessageListener.getClassLoader());
88 }
89 }
90
91 public DestinationStatistics getDestinationStatistics() {
92 DestinationStatistics destinationStatistics =
93 new DestinationStatistics();
94
95 destinationStatistics.setActiveThreadCount(
96 _threadPoolExecutor.getActiveCount());
97 destinationStatistics.setCurrentThreadCount(
98 _threadPoolExecutor.getPoolSize());
99 destinationStatistics.setLargestThreadCount(
100 _threadPoolExecutor.getLargestPoolSize());
101 destinationStatistics.setMaxThreadPoolSize(
102 _threadPoolExecutor.getMaximumPoolSize());
103 destinationStatistics.setMinThreadPoolSize(
104 _threadPoolExecutor.getCorePoolSize());
105 destinationStatistics.setPendingMessageCount(
106 _threadPoolExecutor.getQueue().size());
107 destinationStatistics.setSentMessageCount(
108 _threadPoolExecutor.getCompletedTaskCount());
109
110 return destinationStatistics;
111 }
112
113 public int getMessageListenerCount() {
114 return _messageListeners.size();
115 }
116
117 public String getName() {
118 return _name;
119 }
120
121 public boolean isRegistered() {
122 if (getMessageListenerCount() > 0) {
123 return true;
124 }
125 else {
126 return false;
127 }
128 }
129
130 public synchronized void open() {
131 doOpen();
132 }
133
134 public boolean register(MessageListener messageListener) {
135 InvokerMessageListener invokerMessageListener =
136 new InvokerMessageListener(messageListener);
137
138 return registerMessageListener(invokerMessageListener);
139 }
140
141 public boolean register(
142 MessageListener messageListener, ClassLoader classloader) {
143
144 InvokerMessageListener invokerMessageListener =
145 new InvokerMessageListener(messageListener, classloader);
146
147 return registerMessageListener(invokerMessageListener);
148 }
149
150 public void removeDestinationEventListener(
151 DestinationEventListener destinationEventListener) {
152
153 _destinationEventListeners.remove(destinationEventListener);
154 }
155
156 public void removeDestinationEventListeners() {
157 _destinationEventListeners.clear();
158 }
159
160 public void send(Message message) {
161 if (_messageListeners.isEmpty()) {
162 if (_log.isDebugEnabled()) {
163 _log.debug("No message listeners for destination " + getName());
164 }
165
166 return;
167 }
168
169 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
170
171 if (threadPoolExecutor.isShutdown()) {
172 throw new IllegalStateException(
173 "Destination " + getName() + " is shutdown and cannot " +
174 "receive more messages");
175 }
176
177 dispatch(_messageListeners, message);
178 }
179
180 public boolean unregister(MessageListener messageListener) {
181 InvokerMessageListener invokerMessageListener =
182 new InvokerMessageListener(messageListener);
183
184 return unregisterMessageListener(invokerMessageListener);
185 }
186
187 public boolean unregister(
188 MessageListener messageListener, ClassLoader classloader) {
189
190 InvokerMessageListener invokerMessageListener =
191 new InvokerMessageListener(messageListener, classloader);
192
193 return unregisterMessageListener(invokerMessageListener);
194 }
195
196 public void unregisterMessageListeners() {
197 for (MessageListener messageListener : _messageListeners) {
198 unregisterMessageListener((InvokerMessageListener)messageListener);
199 }
200 }
201
202 protected abstract void dispatch(
203 Set<MessageListener> messageListeners, Message message);
204
205 protected void doClose(boolean force) {
206 if (!_threadPoolExecutor.isShutdown() &&
207 !_threadPoolExecutor.isTerminating()) {
208
209 if (!force) {
210 _threadPoolExecutor.shutdown();
211 }
212 else {
213 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
214
215 if (_log.isInfoEnabled()) {
216 _log.info(
217 "The following " + pendingTasks.size() + " tasks " +
218 "were not executed due to shutown: " +
219 pendingTasks);
220 }
221 }
222 }
223 }
224
225 protected void doOpen() {
226 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
227 _threadPoolExecutor = new ThreadPoolExecutor(
228 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
229 new LinkedBlockingQueue<Runnable>(),
230 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
231 }
232 }
233
234 protected void fireMessageListenerRegisteredEvent(
235 MessageListener messageListener) {
236
237 for (DestinationEventListener destinationEventListener :
238 _destinationEventListeners) {
239
240 destinationEventListener.messageListenerRegistered(
241 getName(), messageListener);
242 }
243 }
244
245 protected void fireMessageListenerUnregisteredEvent(
246 MessageListener messageListener) {
247
248 for (DestinationEventListener listener : _destinationEventListeners) {
249 listener.messageListenerUnregistered(getName(), messageListener);
250 }
251 }
252
253 protected ThreadPoolExecutor getThreadPoolExecutor() {
254 return _threadPoolExecutor;
255 }
256
257 protected int getWorkersCoreSize() {
258 return _workersCoreSize;
259 }
260
261 protected int getWorkersMaxSize() {
262 return _workersMaxSize;
263 }
264
265 protected boolean registerMessageListener(
266 InvokerMessageListener invokerMessageListener) {
267
268 boolean registered = _messageListeners.add(invokerMessageListener);
269
270 if (registered) {
271 fireMessageListenerRegisteredEvent(
272 invokerMessageListener.getMessageListener());
273 }
274
275 return registered;
276 }
277
278 protected boolean unregisterMessageListener(
279 InvokerMessageListener invokerMessageListener) {
280
281 boolean unregistered = _messageListeners.remove(invokerMessageListener);
282
283 if (unregistered) {
284 fireMessageListenerUnregisteredEvent(
285 invokerMessageListener.getMessageListener());
286 }
287
288 return unregistered;
289 }
290
291 private static final int _WORKERS_CORE_SIZE = 2;
292
293 private static final int _WORKERS_MAX_SIZE = 5;
294
295 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
296
297 private Set<DestinationEventListener> _destinationEventListeners =
298 new ConcurrentHashSet<DestinationEventListener>();
299 private Set<MessageListener> _messageListeners =
300 new ConcurrentHashSet<MessageListener>();
301 private String _name;
302 private ThreadPoolExecutor _threadPoolExecutor;
303 private int _workersCoreSize;
304 private int _workersMaxSize;
305
306 }