1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.kernel.messaging;
16  
17  import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
18  import com.liferay.portal.kernel.log.Log;
19  import com.liferay.portal.kernel.log.LogFactoryUtil;
20  import com.liferay.portal.kernel.util.NamedThreadFactory;
21  import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
22  import com.liferay.portal.kernel.util.StringPool;
23  import com.liferay.portal.kernel.util.Validator;
24  
25  import java.util.Collections;
26  import java.util.List;
27  import java.util.Set;
28  import java.util.concurrent.LinkedBlockingQueue;
29  import java.util.concurrent.RejectedExecutionHandler;
30  import java.util.concurrent.ThreadPoolExecutor;
31  import java.util.concurrent.TimeUnit;
32  
33  /**
34   * <a href="BaseDestination.java.html"><b><i>View Source</i></b></a>
35   *
36   * @author Michael C. Han
37   */
38  public abstract class BaseDestination implements Destination {
39  
40      public BaseDestination() {
41      }
42  
43      /**
44       * @deprecated
45       */
46      public BaseDestination(String name) {
47          this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
48      }
49  
50      /**
51       * @deprecated
52       */
53      public BaseDestination(
54          String name, int workersCoreSize, int workersMaxSize) {
55  
56          _name = name;
57          _workersCoreSize = workersCoreSize;
58          _workersMaxSize = workersMaxSize;
59  
60          open();
61      }
62  
63      public void addDestinationEventListener(
64          DestinationEventListener destinationEventListener) {
65  
66          _destinationEventListeners.add(destinationEventListener);
67      }
68  
69      public void afterPropertiesSet() {
70          if (Validator.isNull(_name)) {
71              throw new IllegalArgumentException("Name is null");
72          }
73  
74          open();
75      }
76  
77      public synchronized void close() {
78          close(false);
79      }
80  
81      public synchronized void close(boolean force) {
82          doClose(force);
83      }
84  
85      public void copyDestinationEventListeners(Destination destination) {
86          for (DestinationEventListener destinationEventListener :
87                  _destinationEventListeners) {
88  
89              destination.addDestinationEventListener(
90                  destinationEventListener);
91          }
92      }
93  
94      public void copyMessageListeners(Destination destination) {
95          for (MessageListener messageListener : _messageListeners) {
96              InvokerMessageListener invokerMessageListener =
97                  (InvokerMessageListener)messageListener;
98  
99              destination.register(
100                 invokerMessageListener.getMessageListener(),
101                 invokerMessageListener.getClassLoader());
102         }
103     }
104 
105     public DestinationStatistics getDestinationStatistics() {
106         DestinationStatistics destinationStatistics =
107             new DestinationStatistics();
108 
109         destinationStatistics.setActiveThreadCount(
110             _threadPoolExecutor.getActiveCount());
111         destinationStatistics.setCurrentThreadCount(
112             _threadPoolExecutor.getPoolSize());
113         destinationStatistics.setLargestThreadCount(
114             _threadPoolExecutor.getLargestPoolSize());
115         destinationStatistics.setMaxThreadPoolSize(
116             _threadPoolExecutor.getMaximumPoolSize());
117         destinationStatistics.setMinThreadPoolSize(
118             _threadPoolExecutor.getCorePoolSize());
119         destinationStatistics.setPendingMessageCount(
120             _threadPoolExecutor.getQueue().size());
121         destinationStatistics.setSentMessageCount(
122             _threadPoolExecutor.getCompletedTaskCount());
123 
124         return destinationStatistics;
125     }
126 
127     public int getMaximumQueueSize() {
128         return _maximumQueueSize;
129     }
130 
131     public int getMessageListenerCount() {
132         return _messageListeners.size();
133     }
134 
135     public Set<MessageListener> getMessageListeners() {
136         return Collections.unmodifiableSet(_messageListeners);
137     }
138 
139     public String getName() {
140         return _name;
141     }
142 
143     public int getWorkersCoreSize() {
144         return _workersCoreSize;
145     }
146 
147     public int getWorkersMaxSize() {
148         return _workersMaxSize;
149     }
150 
151     public boolean isRegistered() {
152         if (getMessageListenerCount() > 0) {
153             return true;
154         }
155         else {
156             return false;
157         }
158     }
159 
160     public synchronized void open() {
161         doOpen();
162     }
163 
164     public boolean register(MessageListener messageListener) {
165         InvokerMessageListener invokerMessageListener =
166             new InvokerMessageListener(messageListener);
167 
168         return registerMessageListener(invokerMessageListener);
169     }
170 
171     public boolean register(
172         MessageListener messageListener, ClassLoader classloader) {
173 
174         InvokerMessageListener invokerMessageListener =
175             new InvokerMessageListener(messageListener, classloader);
176 
177         return registerMessageListener(invokerMessageListener);
178     }
179 
180     public void removeDestinationEventListener(
181         DestinationEventListener destinationEventListener) {
182 
183         _destinationEventListeners.remove(destinationEventListener);
184     }
185 
186     public void removeDestinationEventListeners() {
187         _destinationEventListeners.clear();
188     }
189 
190     public void send(Message message) {
191         if (_messageListeners.isEmpty()) {
192             if (_log.isDebugEnabled()) {
193                 _log.debug("No message listeners for destination " + getName());
194             }
195 
196             return;
197         }
198 
199         ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
200 
201         if (threadPoolExecutor.isShutdown()) {
202             throw new IllegalStateException(
203                 "Destination " + getName() + " is shutdown and cannot " +
204                     "receive more messages");
205         }
206 
207         dispatch(_messageListeners, message);
208     }
209 
210     public void setMaximumQueueSize(int maximumQueueSize) {
211         _maximumQueueSize = maximumQueueSize;
212     }
213 
214     public void setName(String name) {
215         _name = name;
216     }
217 
218     public void setRejectedExecutionHandler(
219         RejectedExecutionHandler rejectedExecutionHandler) {
220 
221         _rejectedExecutionHandler = rejectedExecutionHandler;
222     }
223 
224     public void setWorkersCoreSize(int workersCoreSize) {
225         _workersCoreSize = workersCoreSize;
226     }
227 
228     public void setWorkersMaxSize(int workersMaxSize) {
229         _workersMaxSize = workersMaxSize;
230     }
231 
232     public boolean unregister(MessageListener messageListener) {
233         InvokerMessageListener invokerMessageListener =
234             new InvokerMessageListener(messageListener);
235 
236         return unregisterMessageListener(invokerMessageListener);
237     }
238 
239     public boolean unregister(
240         MessageListener messageListener, ClassLoader classloader) {
241 
242         InvokerMessageListener invokerMessageListener =
243             new InvokerMessageListener(messageListener, classloader);
244 
245         return unregisterMessageListener(invokerMessageListener);
246     }
247 
248     public void unregisterMessageListeners() {
249         for (MessageListener messageListener : _messageListeners) {
250             unregisterMessageListener((InvokerMessageListener)messageListener);
251         }
252     }
253 
254     protected RejectedExecutionHandler createRejectionExecutionHandler() {
255         return new RejectedExecutionHandler() {
256 
257             public void rejectedExecution(
258                 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
259 
260                 if (!_log.isWarnEnabled()) {
261                     return;
262                 }
263 
264                 MessageRunnable messageRunnable =
265                     (MessageRunnable)runnable;
266 
267                 _log.warn(
268                     "Discarding message " + messageRunnable.getMessage() +
269                         " because it exceeds the maximum queue size of " +
270                             _maximumQueueSize);
271             }
272 
273         };
274     }
275 
276     protected abstract void dispatch(
277         Set<MessageListener> messageListeners, Message message);
278 
279     protected void doClose(boolean force) {
280         if (!_threadPoolExecutor.isShutdown() &&
281             !_threadPoolExecutor.isTerminating()) {
282 
283             if (!force) {
284                 _threadPoolExecutor.shutdown();
285             }
286             else {
287                 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
288 
289                 if (_log.isInfoEnabled()) {
290                     _log.info(
291                         "The following " + pendingTasks.size() + " tasks " +
292                             "were not executed due to shutown: " +
293                                 pendingTasks);
294                 }
295             }
296         }
297     }
298 
299     protected void doOpen() {
300         if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
301             ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
302 
303             _threadPoolExecutor = new ThreadPoolExecutor(
304                 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
305                 new LinkedBlockingQueue<Runnable>(_maximumQueueSize),
306                 new NamedThreadFactory(
307                     getName(), Thread.NORM_PRIORITY, classLoader));
308 
309             if (_rejectedExecutionHandler == null) {
310                 _rejectedExecutionHandler = createRejectionExecutionHandler();
311             }
312 
313             _threadPoolExecutor.setRejectedExecutionHandler(
314                 _rejectedExecutionHandler);
315         }
316     }
317 
318     protected void fireMessageListenerRegisteredEvent(
319         MessageListener messageListener) {
320 
321         for (DestinationEventListener destinationEventListener :
322                 _destinationEventListeners) {
323 
324             destinationEventListener.messageListenerRegistered(
325                 getName(), messageListener);
326         }
327     }
328 
329     protected void fireMessageListenerUnregisteredEvent(
330         MessageListener messageListener) {
331 
332         for (DestinationEventListener listener : _destinationEventListeners) {
333             listener.messageListenerUnregistered(getName(), messageListener);
334         }
335     }
336 
337     protected ThreadPoolExecutor getThreadPoolExecutor() {
338         return _threadPoolExecutor;
339     }
340 
341     protected boolean registerMessageListener(
342         InvokerMessageListener invokerMessageListener) {
343 
344         boolean registered = _messageListeners.add(invokerMessageListener);
345 
346         if (registered) {
347             fireMessageListenerRegisteredEvent(
348                 invokerMessageListener.getMessageListener());
349         }
350 
351         return registered;
352     }
353 
354     protected boolean unregisterMessageListener(
355         InvokerMessageListener invokerMessageListener) {
356 
357         boolean unregistered = _messageListeners.remove(invokerMessageListener);
358 
359         if (unregistered) {
360             fireMessageListenerUnregisteredEvent(
361                 invokerMessageListener.getMessageListener());
362         }
363 
364         return unregistered;
365     }
366 
367     private static final int _WORKERS_CORE_SIZE = 2;
368 
369     private static final int _WORKERS_MAX_SIZE = 5;
370 
371     private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
372 
373     private Set<DestinationEventListener> _destinationEventListeners =
374         new ConcurrentHashSet<DestinationEventListener>();
375     private int _maximumQueueSize = Integer.MAX_VALUE;
376     private Set<MessageListener> _messageListeners =
377         new ConcurrentHashSet<MessageListener>();
378     private String _name = StringPool.BLANK;
379     private RejectedExecutionHandler _rejectedExecutionHandler;
380     private ThreadPoolExecutor _threadPoolExecutor;
381     private int _workersCoreSize = _WORKERS_CORE_SIZE;
382     private int _workersMaxSize = _WORKERS_MAX_SIZE;
383 
384 }