1
14
15 package com.liferay.portal.kernel.messaging;
16
17 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
18 import com.liferay.portal.kernel.log.Log;
19 import com.liferay.portal.kernel.log.LogFactoryUtil;
20 import com.liferay.portal.kernel.util.NamedThreadFactory;
21 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
22 import com.liferay.portal.kernel.util.StringPool;
23 import com.liferay.portal.kernel.util.Validator;
24
25 import java.util.Collections;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.LinkedBlockingQueue;
29 import java.util.concurrent.RejectedExecutionHandler;
30 import java.util.concurrent.ThreadPoolExecutor;
31 import java.util.concurrent.TimeUnit;
32
33
38 public abstract class BaseDestination implements Destination {
39
40 public BaseDestination() {
41 }
42
43
46 public BaseDestination(String name) {
47 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
48 }
49
50
53 public BaseDestination(
54 String name, int workersCoreSize, int workersMaxSize) {
55
56 _name = name;
57 _workersCoreSize = workersCoreSize;
58 _workersMaxSize = workersMaxSize;
59
60 open();
61 }
62
63 public void addDestinationEventListener(
64 DestinationEventListener destinationEventListener) {
65
66 _destinationEventListeners.add(destinationEventListener);
67 }
68
69 public void afterPropertiesSet() {
70 if (Validator.isNull(_name)) {
71 throw new IllegalArgumentException("Name is null");
72 }
73
74 open();
75 }
76
77 public synchronized void close() {
78 close(false);
79 }
80
81 public synchronized void close(boolean force) {
82 doClose(force);
83 }
84
85 public void copyDestinationEventListeners(Destination destination) {
86 for (DestinationEventListener destinationEventListener :
87 _destinationEventListeners) {
88
89 destination.addDestinationEventListener(
90 destinationEventListener);
91 }
92 }
93
94 public void copyMessageListeners(Destination destination) {
95 for (MessageListener messageListener : _messageListeners) {
96 InvokerMessageListener invokerMessageListener =
97 (InvokerMessageListener)messageListener;
98
99 destination.register(
100 invokerMessageListener.getMessageListener(),
101 invokerMessageListener.getClassLoader());
102 }
103 }
104
105 public DestinationStatistics getDestinationStatistics() {
106 DestinationStatistics destinationStatistics =
107 new DestinationStatistics();
108
109 destinationStatistics.setActiveThreadCount(
110 _threadPoolExecutor.getActiveCount());
111 destinationStatistics.setCurrentThreadCount(
112 _threadPoolExecutor.getPoolSize());
113 destinationStatistics.setLargestThreadCount(
114 _threadPoolExecutor.getLargestPoolSize());
115 destinationStatistics.setMaxThreadPoolSize(
116 _threadPoolExecutor.getMaximumPoolSize());
117 destinationStatistics.setMinThreadPoolSize(
118 _threadPoolExecutor.getCorePoolSize());
119 destinationStatistics.setPendingMessageCount(
120 _threadPoolExecutor.getQueue().size());
121 destinationStatistics.setSentMessageCount(
122 _threadPoolExecutor.getCompletedTaskCount());
123
124 return destinationStatistics;
125 }
126
127 public int getMaximumQueueSize() {
128 return _maximumQueueSize;
129 }
130
131 public int getMessageListenerCount() {
132 return _messageListeners.size();
133 }
134
135 public Set<MessageListener> getMessageListeners() {
136 return Collections.unmodifiableSet(_messageListeners);
137 }
138
139 public String getName() {
140 return _name;
141 }
142
143 public int getWorkersCoreSize() {
144 return _workersCoreSize;
145 }
146
147 public int getWorkersMaxSize() {
148 return _workersMaxSize;
149 }
150
151 public boolean isRegistered() {
152 if (getMessageListenerCount() > 0) {
153 return true;
154 }
155 else {
156 return false;
157 }
158 }
159
160 public synchronized void open() {
161 doOpen();
162 }
163
164 public boolean register(MessageListener messageListener) {
165 InvokerMessageListener invokerMessageListener =
166 new InvokerMessageListener(messageListener);
167
168 return registerMessageListener(invokerMessageListener);
169 }
170
171 public boolean register(
172 MessageListener messageListener, ClassLoader classloader) {
173
174 InvokerMessageListener invokerMessageListener =
175 new InvokerMessageListener(messageListener, classloader);
176
177 return registerMessageListener(invokerMessageListener);
178 }
179
180 public void removeDestinationEventListener(
181 DestinationEventListener destinationEventListener) {
182
183 _destinationEventListeners.remove(destinationEventListener);
184 }
185
186 public void removeDestinationEventListeners() {
187 _destinationEventListeners.clear();
188 }
189
190 public void send(Message message) {
191 if (_messageListeners.isEmpty()) {
192 if (_log.isDebugEnabled()) {
193 _log.debug("No message listeners for destination " + getName());
194 }
195
196 return;
197 }
198
199 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
200
201 if (threadPoolExecutor.isShutdown()) {
202 throw new IllegalStateException(
203 "Destination " + getName() + " is shutdown and cannot " +
204 "receive more messages");
205 }
206
207 dispatch(_messageListeners, message);
208 }
209
210 public void setMaximumQueueSize(int maximumQueueSize) {
211 _maximumQueueSize = maximumQueueSize;
212 }
213
214 public void setName(String name) {
215 _name = name;
216 }
217
218 public void setRejectedExecutionHandler(
219 RejectedExecutionHandler rejectedExecutionHandler) {
220
221 _rejectedExecutionHandler = rejectedExecutionHandler;
222 }
223
224 public void setWorkersCoreSize(int workersCoreSize) {
225 _workersCoreSize = workersCoreSize;
226 }
227
228 public void setWorkersMaxSize(int workersMaxSize) {
229 _workersMaxSize = workersMaxSize;
230 }
231
232 public boolean unregister(MessageListener messageListener) {
233 InvokerMessageListener invokerMessageListener =
234 new InvokerMessageListener(messageListener);
235
236 return unregisterMessageListener(invokerMessageListener);
237 }
238
239 public boolean unregister(
240 MessageListener messageListener, ClassLoader classloader) {
241
242 InvokerMessageListener invokerMessageListener =
243 new InvokerMessageListener(messageListener, classloader);
244
245 return unregisterMessageListener(invokerMessageListener);
246 }
247
248 public void unregisterMessageListeners() {
249 for (MessageListener messageListener : _messageListeners) {
250 unregisterMessageListener((InvokerMessageListener)messageListener);
251 }
252 }
253
254 protected RejectedExecutionHandler createRejectionExecutionHandler() {
255 return new RejectedExecutionHandler() {
256
257 public void rejectedExecution(
258 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
259
260 if (!_log.isWarnEnabled()) {
261 return;
262 }
263
264 MessageRunnable messageRunnable =
265 (MessageRunnable)runnable;
266
267 _log.warn(
268 "Discarding message " + messageRunnable.getMessage() +
269 " because it exceeds the maximum queue size of " +
270 _maximumQueueSize);
271 }
272
273 };
274 }
275
276 protected abstract void dispatch(
277 Set<MessageListener> messageListeners, Message message);
278
279 protected void doClose(boolean force) {
280 if (!_threadPoolExecutor.isShutdown() &&
281 !_threadPoolExecutor.isTerminating()) {
282
283 if (!force) {
284 _threadPoolExecutor.shutdown();
285 }
286 else {
287 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
288
289 if (_log.isInfoEnabled()) {
290 _log.info(
291 "The following " + pendingTasks.size() + " tasks " +
292 "were not executed due to shutown: " +
293 pendingTasks);
294 }
295 }
296 }
297 }
298
299 protected void doOpen() {
300 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
301 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
302
303 _threadPoolExecutor = new ThreadPoolExecutor(
304 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
305 new LinkedBlockingQueue<Runnable>(_maximumQueueSize),
306 new NamedThreadFactory(
307 getName(), Thread.NORM_PRIORITY, classLoader));
308
309 if (_rejectedExecutionHandler == null) {
310 _rejectedExecutionHandler = createRejectionExecutionHandler();
311 }
312
313 _threadPoolExecutor.setRejectedExecutionHandler(
314 _rejectedExecutionHandler);
315 }
316 }
317
318 protected void fireMessageListenerRegisteredEvent(
319 MessageListener messageListener) {
320
321 for (DestinationEventListener destinationEventListener :
322 _destinationEventListeners) {
323
324 destinationEventListener.messageListenerRegistered(
325 getName(), messageListener);
326 }
327 }
328
329 protected void fireMessageListenerUnregisteredEvent(
330 MessageListener messageListener) {
331
332 for (DestinationEventListener listener : _destinationEventListeners) {
333 listener.messageListenerUnregistered(getName(), messageListener);
334 }
335 }
336
337 protected ThreadPoolExecutor getThreadPoolExecutor() {
338 return _threadPoolExecutor;
339 }
340
341 protected boolean registerMessageListener(
342 InvokerMessageListener invokerMessageListener) {
343
344 boolean registered = _messageListeners.add(invokerMessageListener);
345
346 if (registered) {
347 fireMessageListenerRegisteredEvent(
348 invokerMessageListener.getMessageListener());
349 }
350
351 return registered;
352 }
353
354 protected boolean unregisterMessageListener(
355 InvokerMessageListener invokerMessageListener) {
356
357 boolean unregistered = _messageListeners.remove(invokerMessageListener);
358
359 if (unregistered) {
360 fireMessageListenerUnregisteredEvent(
361 invokerMessageListener.getMessageListener());
362 }
363
364 return unregistered;
365 }
366
367 private static final int _WORKERS_CORE_SIZE = 2;
368
369 private static final int _WORKERS_MAX_SIZE = 5;
370
371 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
372
373 private Set<DestinationEventListener> _destinationEventListeners =
374 new ConcurrentHashSet<DestinationEventListener>();
375 private int _maximumQueueSize = Integer.MAX_VALUE;
376 private Set<MessageListener> _messageListeners =
377 new ConcurrentHashSet<MessageListener>();
378 private String _name = StringPool.BLANK;
379 private RejectedExecutionHandler _rejectedExecutionHandler;
380 private ThreadPoolExecutor _threadPoolExecutor;
381 private int _workersCoreSize = _WORKERS_CORE_SIZE;
382 private int _workersMaxSize = _WORKERS_MAX_SIZE;
383
384 }