1
14
15 package com.liferay.portal.kernel.messaging;
16
17 import com.liferay.portal.kernel.log.Log;
18 import com.liferay.portal.kernel.log.LogFactoryUtil;
19 import com.liferay.portal.kernel.util.ConcurrentHashSet;
20 import com.liferay.portal.kernel.util.NamedThreadFactory;
21 import com.liferay.portal.kernel.util.StringPool;
22 import com.liferay.portal.kernel.util.Validator;
23
24 import java.util.List;
25 import java.util.Set;
26 import java.util.concurrent.LinkedBlockingQueue;
27 import java.util.concurrent.ThreadPoolExecutor;
28 import java.util.concurrent.TimeUnit;
29
30
35 public abstract class BaseDestination implements Destination {
36
37 public BaseDestination() {
38 }
39
40
43 public BaseDestination(String name) {
44 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
45 }
46
47
50 public BaseDestination(
51 String name, int workersCoreSize, int workersMaxSize) {
52
53 _name = name;
54 _workersCoreSize = workersCoreSize;
55 _workersMaxSize = workersMaxSize;
56
57 open();
58 }
59
60 public void addDestinationEventListener(
61 DestinationEventListener destinationEventListener) {
62
63 _destinationEventListeners.add(destinationEventListener);
64 }
65
66 public void afterPropertiesSet() {
67 if (Validator.isNull(_name)) {
68 throw new IllegalArgumentException("Name is null");
69 }
70
71 open();
72 }
73
74 public synchronized void close() {
75 close(false);
76 }
77
78 public synchronized void close(boolean force) {
79 doClose(force);
80 }
81
82 public void copyDestinationEventListeners(Destination destination) {
83 for (DestinationEventListener destinationEventListener :
84 _destinationEventListeners) {
85
86 destination.addDestinationEventListener(
87 destinationEventListener);
88 }
89 }
90
91 public void copyMessageListeners(Destination destination) {
92 for (MessageListener messageListener : _messageListeners) {
93 InvokerMessageListener invokerMessageListener =
94 (InvokerMessageListener)messageListener;
95
96 destination.register(
97 invokerMessageListener.getMessageListener(),
98 invokerMessageListener.getClassLoader());
99 }
100 }
101
102 public DestinationStatistics getDestinationStatistics() {
103 DestinationStatistics destinationStatistics =
104 new DestinationStatistics();
105
106 destinationStatistics.setActiveThreadCount(
107 _threadPoolExecutor.getActiveCount());
108 destinationStatistics.setCurrentThreadCount(
109 _threadPoolExecutor.getPoolSize());
110 destinationStatistics.setLargestThreadCount(
111 _threadPoolExecutor.getLargestPoolSize());
112 destinationStatistics.setMaxThreadPoolSize(
113 _threadPoolExecutor.getMaximumPoolSize());
114 destinationStatistics.setMinThreadPoolSize(
115 _threadPoolExecutor.getCorePoolSize());
116 destinationStatistics.setPendingMessageCount(
117 _threadPoolExecutor.getQueue().size());
118 destinationStatistics.setSentMessageCount(
119 _threadPoolExecutor.getCompletedTaskCount());
120
121 return destinationStatistics;
122 }
123
124 public int getMaximumQueueSize() {
125 return _maximumQueueSize;
126 }
127
128 public int getMessageListenerCount() {
129 return _messageListeners.size();
130 }
131
132 public String getName() {
133 return _name;
134 }
135
136 public int getWorkersCoreSize() {
137 return _workersCoreSize;
138 }
139
140 public int getWorkersMaxSize() {
141 return _workersMaxSize;
142 }
143
144 public boolean isRegistered() {
145 if (getMessageListenerCount() > 0) {
146 return true;
147 }
148 else {
149 return false;
150 }
151 }
152
153 public synchronized void open() {
154 doOpen();
155 }
156
157 public boolean register(MessageListener messageListener) {
158 InvokerMessageListener invokerMessageListener =
159 new InvokerMessageListener(messageListener);
160
161 return registerMessageListener(invokerMessageListener);
162 }
163
164 public boolean register(
165 MessageListener messageListener, ClassLoader classloader) {
166
167 InvokerMessageListener invokerMessageListener =
168 new InvokerMessageListener(messageListener, classloader);
169
170 return registerMessageListener(invokerMessageListener);
171 }
172
173 public void removeDestinationEventListener(
174 DestinationEventListener destinationEventListener) {
175
176 _destinationEventListeners.remove(destinationEventListener);
177 }
178
179 public void removeDestinationEventListeners() {
180 _destinationEventListeners.clear();
181 }
182
183 public void send(Message message) {
184 if (_messageListeners.isEmpty()) {
185 if (_log.isDebugEnabled()) {
186 _log.debug("No message listeners for destination " + getName());
187 }
188
189 return;
190 }
191
192 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
193
194 if (threadPoolExecutor.isShutdown()) {
195 throw new IllegalStateException(
196 "Destination " + getName() + " is shutdown and cannot " +
197 "receive more messages");
198 }
199
200 if ((_maximumQueueSize > -1) &&
201 (threadPoolExecutor.getQueue().size() > _maximumQueueSize)) {
202
203 throw new IllegalStateException(
204 threadPoolExecutor.getQueue().size() +
205 " messages exceeds the maximum queue size of " +
206 _maximumQueueSize);
207 }
208
209 dispatch(_messageListeners, message);
210 }
211
212 public void setMaximumQueueSize(int maximumQueueSize) {
213 _maximumQueueSize = maximumQueueSize;
214 }
215
216 public void setName(String name) {
217 _name = name;
218 }
219
220 public void setWorkersCoreSize(int workersCoreSize) {
221 _workersCoreSize = workersCoreSize;
222 }
223
224 public void setWorkersMaxSize(int workersMaxSize) {
225 _workersMaxSize = workersMaxSize;
226 }
227
228 public boolean unregister(MessageListener messageListener) {
229 InvokerMessageListener invokerMessageListener =
230 new InvokerMessageListener(messageListener);
231
232 return unregisterMessageListener(invokerMessageListener);
233 }
234
235 public boolean unregister(
236 MessageListener messageListener, ClassLoader classloader) {
237
238 InvokerMessageListener invokerMessageListener =
239 new InvokerMessageListener(messageListener, classloader);
240
241 return unregisterMessageListener(invokerMessageListener);
242 }
243
244 public void unregisterMessageListeners() {
245 for (MessageListener messageListener : _messageListeners) {
246 unregisterMessageListener((InvokerMessageListener)messageListener);
247 }
248 }
249
250 protected abstract void dispatch(
251 Set<MessageListener> messageListeners, Message message);
252
253 protected void doClose(boolean force) {
254 if (!_threadPoolExecutor.isShutdown() &&
255 !_threadPoolExecutor.isTerminating()) {
256
257 if (!force) {
258 _threadPoolExecutor.shutdown();
259 }
260 else {
261 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
262
263 if (_log.isInfoEnabled()) {
264 _log.info(
265 "The following " + pendingTasks.size() + " tasks " +
266 "were not executed due to shutown: " +
267 pendingTasks);
268 }
269 }
270 }
271 }
272
273 protected void doOpen() {
274 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
275 _threadPoolExecutor = new ThreadPoolExecutor(
276 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
277 new LinkedBlockingQueue<Runnable>(),
278 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
279 }
280 }
281
282 protected void fireMessageListenerRegisteredEvent(
283 MessageListener messageListener) {
284
285 for (DestinationEventListener destinationEventListener :
286 _destinationEventListeners) {
287
288 destinationEventListener.messageListenerRegistered(
289 getName(), messageListener);
290 }
291 }
292
293 protected void fireMessageListenerUnregisteredEvent(
294 MessageListener messageListener) {
295
296 for (DestinationEventListener listener : _destinationEventListeners) {
297 listener.messageListenerUnregistered(getName(), messageListener);
298 }
299 }
300
301 protected ThreadPoolExecutor getThreadPoolExecutor() {
302 return _threadPoolExecutor;
303 }
304
305 protected boolean registerMessageListener(
306 InvokerMessageListener invokerMessageListener) {
307
308 boolean registered = _messageListeners.add(invokerMessageListener);
309
310 if (registered) {
311 fireMessageListenerRegisteredEvent(
312 invokerMessageListener.getMessageListener());
313 }
314
315 return registered;
316 }
317
318 protected boolean unregisterMessageListener(
319 InvokerMessageListener invokerMessageListener) {
320
321 boolean unregistered = _messageListeners.remove(invokerMessageListener);
322
323 if (unregistered) {
324 fireMessageListenerUnregisteredEvent(
325 invokerMessageListener.getMessageListener());
326 }
327
328 return unregistered;
329 }
330
331 private static final int _WORKERS_CORE_SIZE = 2;
332
333 private static final int _WORKERS_MAX_SIZE = 5;
334
335 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
336
337 private Set<DestinationEventListener> _destinationEventListeners =
338 new ConcurrentHashSet<DestinationEventListener>();
339 private int _maximumQueueSize = -1;
340 private Set<MessageListener> _messageListeners =
341 new ConcurrentHashSet<MessageListener>();
342 private String _name = StringPool.BLANK;
343 private ThreadPoolExecutor _threadPoolExecutor;
344 private int _workersCoreSize = _WORKERS_CORE_SIZE;
345 private int _workersMaxSize = _WORKERS_MAX_SIZE;
346
347 }