001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.util.ConcurrentHashSet;
020 import com.liferay.portal.kernel.util.NamedThreadFactory;
021 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
022 import com.liferay.portal.kernel.util.StringPool;
023 import com.liferay.portal.kernel.util.Validator;
024
025 import java.util.List;
026 import java.util.Set;
027 import java.util.concurrent.LinkedBlockingQueue;
028 import java.util.concurrent.RejectedExecutionHandler;
029 import java.util.concurrent.ThreadPoolExecutor;
030 import java.util.concurrent.TimeUnit;
031
032
035 public abstract class BaseDestination implements Destination {
036
037 public BaseDestination() {
038 }
039
040
043 public BaseDestination(String name) {
044 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
045 }
046
047
050 public BaseDestination(
051 String name, int workersCoreSize, int workersMaxSize) {
052
053 _name = name;
054 _workersCoreSize = workersCoreSize;
055 _workersMaxSize = workersMaxSize;
056
057 open();
058 }
059
060 public void addDestinationEventListener(
061 DestinationEventListener destinationEventListener) {
062
063 _destinationEventListeners.add(destinationEventListener);
064 }
065
066 public void afterPropertiesSet() {
067 if (Validator.isNull(_name)) {
068 throw new IllegalArgumentException("Name is null");
069 }
070
071 open();
072 }
073
074 public synchronized void close() {
075 close(false);
076 }
077
078 public synchronized void close(boolean force) {
079 doClose(force);
080 }
081
082 public void copyDestinationEventListeners(Destination destination) {
083 for (DestinationEventListener destinationEventListener :
084 _destinationEventListeners) {
085
086 destination.addDestinationEventListener(
087 destinationEventListener);
088 }
089 }
090
091 public void copyMessageListeners(Destination destination) {
092 for (MessageListener messageListener : _messageListeners) {
093 InvokerMessageListener invokerMessageListener =
094 (InvokerMessageListener)messageListener;
095
096 destination.register(
097 invokerMessageListener.getMessageListener(),
098 invokerMessageListener.getClassLoader());
099 }
100 }
101
102 public DestinationStatistics getDestinationStatistics() {
103 DestinationStatistics destinationStatistics =
104 new DestinationStatistics();
105
106 destinationStatistics.setActiveThreadCount(
107 _threadPoolExecutor.getActiveCount());
108 destinationStatistics.setCurrentThreadCount(
109 _threadPoolExecutor.getPoolSize());
110 destinationStatistics.setLargestThreadCount(
111 _threadPoolExecutor.getLargestPoolSize());
112 destinationStatistics.setMaxThreadPoolSize(
113 _threadPoolExecutor.getMaximumPoolSize());
114 destinationStatistics.setMinThreadPoolSize(
115 _threadPoolExecutor.getCorePoolSize());
116 destinationStatistics.setPendingMessageCount(
117 _threadPoolExecutor.getQueue().size());
118 destinationStatistics.setSentMessageCount(
119 _threadPoolExecutor.getCompletedTaskCount());
120
121 return destinationStatistics;
122 }
123
124 public int getMaximumQueueSize() {
125 return _maximumQueueSize;
126 }
127
128 public int getMessageListenerCount() {
129 return _messageListeners.size();
130 }
131
132 public String getName() {
133 return _name;
134 }
135
136 public int getWorkersCoreSize() {
137 return _workersCoreSize;
138 }
139
140 public int getWorkersMaxSize() {
141 return _workersMaxSize;
142 }
143
144 public boolean isRegistered() {
145 if (getMessageListenerCount() > 0) {
146 return true;
147 }
148 else {
149 return false;
150 }
151 }
152
153 public synchronized void open() {
154 doOpen();
155 }
156
157 public boolean register(MessageListener messageListener) {
158 InvokerMessageListener invokerMessageListener =
159 new InvokerMessageListener(messageListener);
160
161 return registerMessageListener(invokerMessageListener);
162 }
163
164 public boolean register(
165 MessageListener messageListener, ClassLoader classloader) {
166
167 InvokerMessageListener invokerMessageListener =
168 new InvokerMessageListener(messageListener, classloader);
169
170 return registerMessageListener(invokerMessageListener);
171 }
172
173 public void removeDestinationEventListener(
174 DestinationEventListener destinationEventListener) {
175
176 _destinationEventListeners.remove(destinationEventListener);
177 }
178
179 public void removeDestinationEventListeners() {
180 _destinationEventListeners.clear();
181 }
182
183 public void send(Message message) {
184 if (_messageListeners.isEmpty()) {
185 if (_log.isDebugEnabled()) {
186 _log.debug("No message listeners for destination " + getName());
187 }
188
189 return;
190 }
191
192 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
193
194 if (threadPoolExecutor.isShutdown()) {
195 throw new IllegalStateException(
196 "Destination " + getName() + " is shutdown and cannot " +
197 "receive more messages");
198 }
199
200 dispatch(_messageListeners, message);
201 }
202
203 public void setMaximumQueueSize(int maximumQueueSize) {
204 _maximumQueueSize = maximumQueueSize;
205 }
206
207 public void setName(String name) {
208 _name = name;
209 }
210
211 public void setRejectedExecutionHandler(
212 RejectedExecutionHandler rejectedExecutionHandler) {
213
214 _rejectedExecutionHandler = rejectedExecutionHandler;
215 }
216
217 public void setWorkersCoreSize(int workersCoreSize) {
218 _workersCoreSize = workersCoreSize;
219 }
220
221 public void setWorkersMaxSize(int workersMaxSize) {
222 _workersMaxSize = workersMaxSize;
223 }
224
225 public boolean unregister(MessageListener messageListener) {
226 InvokerMessageListener invokerMessageListener =
227 new InvokerMessageListener(messageListener);
228
229 return unregisterMessageListener(invokerMessageListener);
230 }
231
232 public boolean unregister(
233 MessageListener messageListener, ClassLoader classloader) {
234
235 InvokerMessageListener invokerMessageListener =
236 new InvokerMessageListener(messageListener, classloader);
237
238 return unregisterMessageListener(invokerMessageListener);
239 }
240
241 public void unregisterMessageListeners() {
242 for (MessageListener messageListener : _messageListeners) {
243 unregisterMessageListener((InvokerMessageListener)messageListener);
244 }
245 }
246
247 protected RejectedExecutionHandler createRejectionExecutionHandler() {
248 return new RejectedExecutionHandler() {
249
250 public void rejectedExecution(
251 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
252
253 if (!_log.isWarnEnabled()) {
254 return;
255 }
256
257 MessageRunnable messageRunnable =
258 (MessageRunnable)runnable;
259
260 _log.warn(
261 "Discarding message " + messageRunnable.getMessage() +
262 " because it exceeds the maximum queue size of " +
263 _maximumQueueSize);
264 }
265
266 };
267 }
268
269 protected abstract void dispatch(
270 Set<MessageListener> messageListeners, Message message);
271
272 protected void doClose(boolean force) {
273 if (!_threadPoolExecutor.isShutdown() &&
274 !_threadPoolExecutor.isTerminating()) {
275
276 if (!force) {
277 _threadPoolExecutor.shutdown();
278 }
279 else {
280 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
281
282 if (_log.isInfoEnabled()) {
283 _log.info(
284 "The following " + pendingTasks.size() + " tasks " +
285 "were not executed due to shutown: " +
286 pendingTasks);
287 }
288 }
289 }
290 }
291
292 protected void doOpen() {
293 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
294 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
295
296 _threadPoolExecutor = new ThreadPoolExecutor(
297 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
298 new LinkedBlockingQueue<Runnable>(_maximumQueueSize),
299 new NamedThreadFactory(
300 getName(), Thread.NORM_PRIORITY, classLoader));
301
302 if (_rejectedExecutionHandler == null) {
303 _rejectedExecutionHandler = createRejectionExecutionHandler();
304 }
305
306 _threadPoolExecutor.setRejectedExecutionHandler(
307 _rejectedExecutionHandler);
308 }
309 }
310
311 protected void fireMessageListenerRegisteredEvent(
312 MessageListener messageListener) {
313
314 for (DestinationEventListener destinationEventListener :
315 _destinationEventListeners) {
316
317 destinationEventListener.messageListenerRegistered(
318 getName(), messageListener);
319 }
320 }
321
322 protected void fireMessageListenerUnregisteredEvent(
323 MessageListener messageListener) {
324
325 for (DestinationEventListener listener : _destinationEventListeners) {
326 listener.messageListenerUnregistered(getName(), messageListener);
327 }
328 }
329
330 protected ThreadPoolExecutor getThreadPoolExecutor() {
331 return _threadPoolExecutor;
332 }
333
334 protected boolean registerMessageListener(
335 InvokerMessageListener invokerMessageListener) {
336
337 boolean registered = _messageListeners.add(invokerMessageListener);
338
339 if (registered) {
340 fireMessageListenerRegisteredEvent(
341 invokerMessageListener.getMessageListener());
342 }
343
344 return registered;
345 }
346
347 protected boolean unregisterMessageListener(
348 InvokerMessageListener invokerMessageListener) {
349
350 boolean unregistered = _messageListeners.remove(invokerMessageListener);
351
352 if (unregistered) {
353 fireMessageListenerUnregisteredEvent(
354 invokerMessageListener.getMessageListener());
355 }
356
357 return unregistered;
358 }
359
360 private static final int _WORKERS_CORE_SIZE = 2;
361
362 private static final int _WORKERS_MAX_SIZE = 5;
363
364 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
365
366 private Set<DestinationEventListener> _destinationEventListeners =
367 new ConcurrentHashSet<DestinationEventListener>();
368 private int _maximumQueueSize = Integer.MAX_VALUE;
369 private Set<MessageListener> _messageListeners =
370 new ConcurrentHashSet<MessageListener>();
371 private String _name = StringPool.BLANK;
372 private RejectedExecutionHandler _rejectedExecutionHandler;
373 private ThreadPoolExecutor _threadPoolExecutor;
374 private int _workersCoreSize = _WORKERS_CORE_SIZE;
375 private int _workersMaxSize = _WORKERS_MAX_SIZE;
376
377 }