1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * This library is free software; you can redistribute it and/or modify it under
5    * the terms of the GNU Lesser General Public License as published by the Free
6    * Software Foundation; either version 2.1 of the License, or (at your option)
7    * any later version.
8    *
9    * This library is distributed in the hope that it will be useful, but WITHOUT
10   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11   * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12   * details.
13   */
14  
15  package com.liferay.portal.kernel.messaging;
16  
17  import com.liferay.portal.kernel.log.Log;
18  import com.liferay.portal.kernel.log.LogFactoryUtil;
19  import com.liferay.portal.kernel.util.ConcurrentHashSet;
20  import com.liferay.portal.kernel.util.NamedThreadFactory;
21  import com.liferay.portal.kernel.util.StringPool;
22  import com.liferay.portal.kernel.util.Validator;
23  
24  import java.util.List;
25  import java.util.Set;
26  import java.util.concurrent.LinkedBlockingQueue;
27  import java.util.concurrent.ThreadPoolExecutor;
28  import java.util.concurrent.TimeUnit;
29  
30  /**
31   * <a href="BaseDestination.java.html"><b><i>View Source</i></b></a>
32   *
33   * @author Michael C. Han
34   */
35  public abstract class BaseDestination implements Destination {
36  
37      public BaseDestination() {
38      }
39  
40      /**
41       * @deprecated
42       */
43      public BaseDestination(String name) {
44          this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
45      }
46  
47      /**
48       * @deprecated
49       */
50      public BaseDestination(
51          String name, int workersCoreSize, int workersMaxSize) {
52  
53          _name = name;
54          _workersCoreSize = workersCoreSize;
55          _workersMaxSize = workersMaxSize;
56  
57          open();
58      }
59  
60      public void addDestinationEventListener(
61          DestinationEventListener destinationEventListener) {
62  
63          _destinationEventListeners.add(destinationEventListener);
64      }
65  
66      public void afterPropertiesSet() {
67          if (Validator.isNull(_name)) {
68              throw new IllegalArgumentException("Name is null");
69          }
70  
71          open();
72      }
73  
74      public synchronized void close() {
75          close(false);
76      }
77  
78      public synchronized void close(boolean force) {
79          doClose(force);
80      }
81  
82      public void copyDestinationEventListeners(Destination destination) {
83          for (DestinationEventListener destinationEventListener :
84                  _destinationEventListeners) {
85  
86              destination.addDestinationEventListener(
87                  destinationEventListener);
88          }
89      }
90  
91      public void copyMessageListeners(Destination destination) {
92          for (MessageListener messageListener : _messageListeners) {
93              InvokerMessageListener invokerMessageListener =
94                  (InvokerMessageListener)messageListener;
95  
96              destination.register(
97                  invokerMessageListener.getMessageListener(),
98                  invokerMessageListener.getClassLoader());
99          }
100     }
101 
102     public DestinationStatistics getDestinationStatistics() {
103         DestinationStatistics destinationStatistics =
104             new DestinationStatistics();
105 
106         destinationStatistics.setActiveThreadCount(
107             _threadPoolExecutor.getActiveCount());
108         destinationStatistics.setCurrentThreadCount(
109             _threadPoolExecutor.getPoolSize());
110         destinationStatistics.setLargestThreadCount(
111             _threadPoolExecutor.getLargestPoolSize());
112         destinationStatistics.setMaxThreadPoolSize(
113             _threadPoolExecutor.getMaximumPoolSize());
114         destinationStatistics.setMinThreadPoolSize(
115             _threadPoolExecutor.getCorePoolSize());
116         destinationStatistics.setPendingMessageCount(
117             _threadPoolExecutor.getQueue().size());
118         destinationStatistics.setSentMessageCount(
119             _threadPoolExecutor.getCompletedTaskCount());
120 
121         return destinationStatistics;
122     }
123 
124     public int getMaximumQueueSize() {
125         return _maximumQueueSize;
126     }
127 
128     public int getMessageListenerCount() {
129         return _messageListeners.size();
130     }
131 
132     public String getName() {
133         return _name;
134     }
135 
136     public int getWorkersCoreSize() {
137         return _workersCoreSize;
138     }
139 
140     public int getWorkersMaxSize() {
141         return _workersMaxSize;
142     }
143 
144     public boolean isRegistered() {
145         if (getMessageListenerCount() > 0) {
146             return true;
147         }
148         else {
149             return false;
150         }
151     }
152 
153     public synchronized void open() {
154         doOpen();
155     }
156 
157     public boolean register(MessageListener messageListener) {
158         InvokerMessageListener invokerMessageListener =
159             new InvokerMessageListener(messageListener);
160 
161         return registerMessageListener(invokerMessageListener);
162     }
163 
164     public boolean register(
165         MessageListener messageListener, ClassLoader classloader) {
166 
167         InvokerMessageListener invokerMessageListener =
168             new InvokerMessageListener(messageListener, classloader);
169 
170         return registerMessageListener(invokerMessageListener);
171     }
172 
173     public void removeDestinationEventListener(
174         DestinationEventListener destinationEventListener) {
175 
176         _destinationEventListeners.remove(destinationEventListener);
177     }
178 
179     public void removeDestinationEventListeners() {
180         _destinationEventListeners.clear();
181     }
182 
183     public void send(Message message) {
184         if (_messageListeners.isEmpty()) {
185             if (_log.isDebugEnabled()) {
186                 _log.debug("No message listeners for destination " + getName());
187             }
188 
189             return;
190         }
191 
192         ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
193 
194         if (threadPoolExecutor.isShutdown()) {
195             throw new IllegalStateException(
196                 "Destination " + getName() + " is shutdown and cannot " +
197                     "receive more messages");
198         }
199 
200         if ((_maximumQueueSize > -1) &&
201             (threadPoolExecutor.getQueue().size() > _maximumQueueSize)) {
202 
203             throw new IllegalStateException(
204                 threadPoolExecutor.getQueue().size() +
205                     " messages exceeds the maximum queue size of " +
206                         _maximumQueueSize);
207         }
208 
209         dispatch(_messageListeners, message);
210     }
211 
212     public void setMaximumQueueSize(int maximumQueueSize) {
213         _maximumQueueSize = maximumQueueSize;
214     }
215 
216     public void setName(String name) {
217         _name = name;
218     }
219 
220     public void setWorkersCoreSize(int workersCoreSize) {
221         _workersCoreSize = workersCoreSize;
222     }
223 
224     public void setWorkersMaxSize(int workersMaxSize) {
225         _workersMaxSize = workersMaxSize;
226     }
227 
228     public boolean unregister(MessageListener messageListener) {
229         InvokerMessageListener invokerMessageListener =
230             new InvokerMessageListener(messageListener);
231 
232         return unregisterMessageListener(invokerMessageListener);
233     }
234 
235     public boolean unregister(
236         MessageListener messageListener, ClassLoader classloader) {
237 
238         InvokerMessageListener invokerMessageListener =
239             new InvokerMessageListener(messageListener, classloader);
240 
241         return unregisterMessageListener(invokerMessageListener);
242     }
243 
244     public void unregisterMessageListeners() {
245         for (MessageListener messageListener : _messageListeners) {
246             unregisterMessageListener((InvokerMessageListener)messageListener);
247         }
248     }
249 
250     protected abstract void dispatch(
251         Set<MessageListener> messageListeners, Message message);
252 
253     protected void doClose(boolean force) {
254         if (!_threadPoolExecutor.isShutdown() &&
255             !_threadPoolExecutor.isTerminating()) {
256 
257             if (!force) {
258                 _threadPoolExecutor.shutdown();
259             }
260             else {
261                 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
262 
263                 if (_log.isInfoEnabled()) {
264                     _log.info(
265                         "The following " + pendingTasks.size() + " tasks " +
266                             "were not executed due to shutown: " +
267                                 pendingTasks);
268                 }
269             }
270         }
271     }
272 
273     protected void doOpen() {
274         if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
275             _threadPoolExecutor = new ThreadPoolExecutor(
276                 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
277                 new LinkedBlockingQueue<Runnable>(),
278                 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
279         }
280     }
281 
282     protected void fireMessageListenerRegisteredEvent(
283         MessageListener messageListener) {
284 
285         for (DestinationEventListener destinationEventListener :
286                 _destinationEventListeners) {
287 
288             destinationEventListener.messageListenerRegistered(
289                 getName(), messageListener);
290         }
291     }
292 
293     protected void fireMessageListenerUnregisteredEvent(
294         MessageListener messageListener) {
295 
296         for (DestinationEventListener listener : _destinationEventListeners) {
297             listener.messageListenerUnregistered(getName(), messageListener);
298         }
299     }
300 
301     protected ThreadPoolExecutor getThreadPoolExecutor() {
302         return _threadPoolExecutor;
303     }
304 
305     protected boolean registerMessageListener(
306         InvokerMessageListener invokerMessageListener) {
307 
308         boolean registered = _messageListeners.add(invokerMessageListener);
309 
310         if (registered) {
311             fireMessageListenerRegisteredEvent(
312                 invokerMessageListener.getMessageListener());
313         }
314 
315         return registered;
316     }
317 
318     protected boolean unregisterMessageListener(
319         InvokerMessageListener invokerMessageListener) {
320 
321         boolean unregistered = _messageListeners.remove(invokerMessageListener);
322 
323         if (unregistered) {
324             fireMessageListenerUnregisteredEvent(
325                 invokerMessageListener.getMessageListener());
326         }
327 
328         return unregistered;
329     }
330 
331     private static final int _WORKERS_CORE_SIZE = 2;
332 
333     private static final int _WORKERS_MAX_SIZE = 5;
334 
335     private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
336 
337     private Set<DestinationEventListener> _destinationEventListeners =
338         new ConcurrentHashSet<DestinationEventListener>();
339     private int _maximumQueueSize = -1;
340     private Set<MessageListener> _messageListeners =
341         new ConcurrentHashSet<MessageListener>();
342     private String _name = StringPool.BLANK;
343     private ThreadPoolExecutor _threadPoolExecutor;
344     private int _workersCoreSize = _WORKERS_CORE_SIZE;
345     private int _workersMaxSize = _WORKERS_MAX_SIZE;
346 
347 }