1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    *
5    *
6    *
7    * The contents of this file are subject to the terms of the Liferay Enterprise
8    * Subscription License ("License"). You may not use this file except in
9    * compliance with the License. You can obtain a copy of the License by
10   * contacting Liferay, Inc. See the License for the specific language governing
11   * permissions and limitations under the License, including but not limited to
12   * distribution rights of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.kernel.messaging;
24  
25  import com.liferay.portal.kernel.log.Log;
26  import com.liferay.portal.kernel.log.LogFactoryUtil;
27  import com.liferay.portal.kernel.util.ConcurrentHashSet;
28  import com.liferay.portal.kernel.util.NamedThreadFactory;
29  
30  import java.util.List;
31  import java.util.Set;
32  import java.util.concurrent.LinkedBlockingQueue;
33  import java.util.concurrent.ThreadPoolExecutor;
34  import java.util.concurrent.TimeUnit;
35  
36  /**
37   * <a href="BaseDestination.java.html"><b><i>View Source</i></b></a>
38   *
39   * @author Michael C. Han
40   */
41  public abstract class BaseDestination implements Destination {
42  
43      public BaseDestination(String name) {
44          this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
45      }
46  
47      public BaseDestination(
48          String name, int workersCoreSize, int workersMaxSize) {
49  
50          _name = name;
51          _workersCoreSize = workersCoreSize;
52          _workersMaxSize = workersMaxSize;
53  
54          open();
55      }
56  
57      public void addDestinationEventListener(
58          DestinationEventListener destinationEventListener) {
59  
60          _destinationEventListeners.add(destinationEventListener);
61      }
62  
63      public synchronized void close() {
64          close(false);
65      }
66  
67      public synchronized void close(boolean force) {
68          doClose(force);
69      }
70  
71      public void copyDestinationEventListeners(Destination destination) {
72          for (DestinationEventListener destinationEventListener :
73                  _destinationEventListeners) {
74  
75              destination.addDestinationEventListener(
76                  destinationEventListener);
77          }
78      }
79  
80      public void copyMessageListeners(Destination destination) {
81          for (MessageListener messageListener : _messageListeners) {
82              InvokerMessageListener invokerMessageListener =
83                  (InvokerMessageListener)messageListener;
84  
85              destination.register(
86                  invokerMessageListener.getMessageListener(),
87                  invokerMessageListener.getClassLoader());
88          }
89      }
90  
91      public DestinationStatistics getDestinationStatistics() {
92          DestinationStatistics destinationStatistics =
93              new DestinationStatistics();
94  
95          destinationStatistics.setActiveThreadCount(
96              _threadPoolExecutor.getActiveCount());
97          destinationStatistics.setCurrentThreadCount(
98              _threadPoolExecutor.getPoolSize());
99          destinationStatistics.setLargestThreadCount(
100             _threadPoolExecutor.getLargestPoolSize());
101         destinationStatistics.setMaxThreadPoolSize(
102             _threadPoolExecutor.getMaximumPoolSize());
103         destinationStatistics.setMinThreadPoolSize(
104             _threadPoolExecutor.getCorePoolSize());
105         destinationStatistics.setPendingMessageCount(
106             _threadPoolExecutor.getQueue().size());
107         destinationStatistics.setSentMessageCount(
108             _threadPoolExecutor.getCompletedTaskCount());
109 
110         return destinationStatistics;
111     }
112 
113     public int getMessageListenerCount() {
114         return _messageListeners.size();
115     }
116 
117     public String getName() {
118         return _name;
119     }
120 
121     public boolean isRegistered() {
122         if (getMessageListenerCount() > 0) {
123             return true;
124         }
125         else {
126             return false;
127         }
128     }
129 
130     public synchronized void open() {
131         doOpen();
132     }
133 
134     public boolean register(MessageListener messageListener) {
135         InvokerMessageListener invokerMessageListener =
136             new InvokerMessageListener(messageListener);
137 
138         return registerMessageListener(invokerMessageListener);
139     }
140 
141     public boolean register(
142         MessageListener messageListener, ClassLoader classloader) {
143 
144         InvokerMessageListener invokerMessageListener =
145             new InvokerMessageListener(messageListener, classloader);
146 
147         return registerMessageListener(invokerMessageListener);
148     }
149 
150     public void removeDestinationEventListener(
151         DestinationEventListener destinationEventListener) {
152 
153         _destinationEventListeners.remove(destinationEventListener);
154     }
155 
156     public void removeDestinationEventListeners() {
157         _destinationEventListeners.clear();
158     }
159 
160     public void send(Message message) {
161         if (_messageListeners.isEmpty()) {
162             if (_log.isDebugEnabled()) {
163                 _log.debug("No message listeners for destination " + getName());
164             }
165 
166             return;
167         }
168 
169         ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
170 
171         if (threadPoolExecutor.isShutdown()) {
172             throw new IllegalStateException(
173                 "Destination " + getName() + " is shutdown and cannot " +
174                     "receive more messages");
175         }
176 
177         dispatch(_messageListeners, message);
178     }
179 
180     public boolean unregister(MessageListener messageListener) {
181         InvokerMessageListener invokerMessageListener =
182             new InvokerMessageListener(messageListener);
183 
184         return unregisterMessageListener(invokerMessageListener);
185     }
186 
187     public boolean unregister(
188         MessageListener messageListener, ClassLoader classloader) {
189 
190         InvokerMessageListener invokerMessageListener =
191             new InvokerMessageListener(messageListener, classloader);
192 
193         return unregisterMessageListener(invokerMessageListener);
194     }
195 
196     public void unregisterMessageListeners() {
197         for (MessageListener messageListener : _messageListeners) {
198             unregisterMessageListener((InvokerMessageListener)messageListener);
199         }
200     }
201 
202     protected abstract void dispatch(
203         Set<MessageListener> messageListeners, Message message);
204 
205     protected void doClose(boolean force) {
206         if (!_threadPoolExecutor.isShutdown() &&
207             !_threadPoolExecutor.isTerminating()) {
208 
209             if (!force) {
210                 _threadPoolExecutor.shutdown();
211             }
212             else {
213                 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
214 
215                 if (_log.isInfoEnabled()) {
216                     _log.info(
217                         "The following " + pendingTasks.size() + " tasks " +
218                             "were not executed due to shutown: " +
219                                 pendingTasks);
220                 }
221             }
222         }
223     }
224 
225     protected void doOpen() {
226         if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
227             _threadPoolExecutor = new ThreadPoolExecutor(
228                 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
229                 new LinkedBlockingQueue<Runnable>(),
230                 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
231         }
232     }
233 
234     protected void fireMessageListenerRegisteredEvent(
235         MessageListener messageListener) {
236 
237         for (DestinationEventListener destinationEventListener :
238                 _destinationEventListeners) {
239 
240             destinationEventListener.messageListenerRegistered(
241                 getName(), messageListener);
242         }
243     }
244 
245     protected void fireMessageListenerUnregisteredEvent(
246         MessageListener messageListener) {
247 
248         for (DestinationEventListener listener : _destinationEventListeners) {
249             listener.messageListenerUnregistered(getName(), messageListener);
250         }
251     }
252 
253     protected ThreadPoolExecutor getThreadPoolExecutor() {
254         return _threadPoolExecutor;
255     }
256 
257     protected int getWorkersCoreSize() {
258         return _workersCoreSize;
259     }
260 
261     protected int getWorkersMaxSize() {
262         return _workersMaxSize;
263     }
264 
265     protected boolean registerMessageListener(
266         InvokerMessageListener invokerMessageListener) {
267 
268         boolean registered = _messageListeners.add(invokerMessageListener);
269 
270         if (registered) {
271             fireMessageListenerRegisteredEvent(
272                 invokerMessageListener.getMessageListener());
273         }
274 
275         return registered;
276     }
277 
278     protected boolean unregisterMessageListener(
279         InvokerMessageListener invokerMessageListener) {
280 
281         boolean unregistered = _messageListeners.remove(invokerMessageListener);
282 
283         if (unregistered) {
284             fireMessageListenerUnregisteredEvent(
285                 invokerMessageListener.getMessageListener());
286         }
287 
288         return unregistered;
289     }
290 
291     private static final int _WORKERS_CORE_SIZE = 2;
292 
293     private static final int _WORKERS_MAX_SIZE = 5;
294 
295     private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
296 
297     private Set<DestinationEventListener> _destinationEventListeners =
298         new ConcurrentHashSet<DestinationEventListener>();
299     private Set<MessageListener> _messageListeners =
300         new ConcurrentHashSet<MessageListener>();
301     private String _name;
302     private ThreadPoolExecutor _threadPoolExecutor;
303     private int _workersCoreSize;
304     private int _workersMaxSize;
305 
306 }