001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.util.ConcurrentHashSet;
020    import com.liferay.portal.kernel.util.NamedThreadFactory;
021    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
022    import com.liferay.portal.kernel.util.StringPool;
023    import com.liferay.portal.kernel.util.Validator;
024    
025    import java.util.List;
026    import java.util.Set;
027    import java.util.concurrent.LinkedBlockingQueue;
028    import java.util.concurrent.RejectedExecutionHandler;
029    import java.util.concurrent.ThreadPoolExecutor;
030    import java.util.concurrent.TimeUnit;
031    
032    /**
033     * @author Michael C. Han
034     */
035    public abstract class BaseDestination implements Destination {
036    
037            public BaseDestination() {
038            }
039    
040            /**
041             * @deprecated
042             */
043            public BaseDestination(String name) {
044                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
045            }
046    
047            /**
048             * @deprecated
049             */
050            public BaseDestination(
051                    String name, int workersCoreSize, int workersMaxSize) {
052    
053                    _name = name;
054                    _workersCoreSize = workersCoreSize;
055                    _workersMaxSize = workersMaxSize;
056    
057                    open();
058            }
059    
060            public void addDestinationEventListener(
061                    DestinationEventListener destinationEventListener) {
062    
063                    _destinationEventListeners.add(destinationEventListener);
064            }
065    
066            public void afterPropertiesSet() {
067                    if (Validator.isNull(_name)) {
068                            throw new IllegalArgumentException("Name is null");
069                    }
070    
071                    open();
072            }
073    
074            public synchronized void close() {
075                    close(false);
076            }
077    
078            public synchronized void close(boolean force) {
079                    doClose(force);
080            }
081    
082            public void copyDestinationEventListeners(Destination destination) {
083                    for (DestinationEventListener destinationEventListener :
084                                    _destinationEventListeners) {
085    
086                            destination.addDestinationEventListener(
087                                    destinationEventListener);
088                    }
089            }
090    
091            public void copyMessageListeners(Destination destination) {
092                    for (MessageListener messageListener : _messageListeners) {
093                            InvokerMessageListener invokerMessageListener =
094                                    (InvokerMessageListener)messageListener;
095    
096                            destination.register(
097                                    invokerMessageListener.getMessageListener(),
098                                    invokerMessageListener.getClassLoader());
099                    }
100            }
101    
102            public DestinationStatistics getDestinationStatistics() {
103                    DestinationStatistics destinationStatistics =
104                            new DestinationStatistics();
105    
106                    destinationStatistics.setActiveThreadCount(
107                            _threadPoolExecutor.getActiveCount());
108                    destinationStatistics.setCurrentThreadCount(
109                            _threadPoolExecutor.getPoolSize());
110                    destinationStatistics.setLargestThreadCount(
111                            _threadPoolExecutor.getLargestPoolSize());
112                    destinationStatistics.setMaxThreadPoolSize(
113                            _threadPoolExecutor.getMaximumPoolSize());
114                    destinationStatistics.setMinThreadPoolSize(
115                            _threadPoolExecutor.getCorePoolSize());
116                    destinationStatistics.setPendingMessageCount(
117                            _threadPoolExecutor.getQueue().size());
118                    destinationStatistics.setSentMessageCount(
119                            _threadPoolExecutor.getCompletedTaskCount());
120    
121                    return destinationStatistics;
122            }
123    
124            public int getMaximumQueueSize() {
125                    return _maximumQueueSize;
126            }
127    
128            public int getMessageListenerCount() {
129                    return _messageListeners.size();
130            }
131    
132            public String getName() {
133                    return _name;
134            }
135    
136            public int getWorkersCoreSize() {
137                    return _workersCoreSize;
138            }
139    
140            public int getWorkersMaxSize() {
141                    return _workersMaxSize;
142            }
143    
144            public boolean isRegistered() {
145                    if (getMessageListenerCount() > 0) {
146                            return true;
147                    }
148                    else {
149                            return false;
150                    }
151            }
152    
153            public synchronized void open() {
154                    doOpen();
155            }
156    
157            public boolean register(MessageListener messageListener) {
158                    InvokerMessageListener invokerMessageListener =
159                            new InvokerMessageListener(messageListener);
160    
161                    return registerMessageListener(invokerMessageListener);
162            }
163    
164            public boolean register(
165                    MessageListener messageListener, ClassLoader classloader) {
166    
167                    InvokerMessageListener invokerMessageListener =
168                            new InvokerMessageListener(messageListener, classloader);
169    
170                    return registerMessageListener(invokerMessageListener);
171            }
172    
173            public void removeDestinationEventListener(
174                    DestinationEventListener destinationEventListener) {
175    
176                    _destinationEventListeners.remove(destinationEventListener);
177            }
178    
179            public void removeDestinationEventListeners() {
180                    _destinationEventListeners.clear();
181            }
182    
183            public void send(Message message) {
184                    if (_messageListeners.isEmpty()) {
185                            if (_log.isDebugEnabled()) {
186                                    _log.debug("No message listeners for destination " + getName());
187                            }
188    
189                            return;
190                    }
191    
192                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
193    
194                    if (threadPoolExecutor.isShutdown()) {
195                            throw new IllegalStateException(
196                                    "Destination " + getName() + " is shutdown and cannot " +
197                                            "receive more messages");
198                    }
199    
200                    dispatch(_messageListeners, message);
201            }
202    
203            public void setMaximumQueueSize(int maximumQueueSize) {
204                    _maximumQueueSize = maximumQueueSize;
205            }
206    
207            public void setName(String name) {
208                    _name = name;
209            }
210    
211            public void setRejectedExecutionHandler(
212                    RejectedExecutionHandler rejectedExecutionHandler) {
213    
214                    _rejectedExecutionHandler = rejectedExecutionHandler;
215            }
216    
217            public void setWorkersCoreSize(int workersCoreSize) {
218                    _workersCoreSize = workersCoreSize;
219            }
220    
221            public void setWorkersMaxSize(int workersMaxSize) {
222                    _workersMaxSize = workersMaxSize;
223            }
224    
225            public boolean unregister(MessageListener messageListener) {
226                    InvokerMessageListener invokerMessageListener =
227                            new InvokerMessageListener(messageListener);
228    
229                    return unregisterMessageListener(invokerMessageListener);
230            }
231    
232            public boolean unregister(
233                    MessageListener messageListener, ClassLoader classloader) {
234    
235                    InvokerMessageListener invokerMessageListener =
236                            new InvokerMessageListener(messageListener, classloader);
237    
238                    return unregisterMessageListener(invokerMessageListener);
239            }
240    
241            public void unregisterMessageListeners() {
242                    for (MessageListener messageListener : _messageListeners) {
243                            unregisterMessageListener((InvokerMessageListener)messageListener);
244                    }
245            }
246    
247            protected RejectedExecutionHandler createRejectionExecutionHandler() {
248                    return new RejectedExecutionHandler() {
249    
250                            public void rejectedExecution(
251                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
252    
253                                    if (!_log.isWarnEnabled()) {
254                                            return;
255                                    }
256    
257                                    MessageRunnable messageRunnable =
258                                            (MessageRunnable)runnable;
259    
260                                    _log.warn(
261                                            "Discarding message " + messageRunnable.getMessage() +
262                                                    " because it exceeds the maximum queue size of " +
263                                                            _maximumQueueSize);
264                            }
265    
266                    };
267            }
268    
269            protected abstract void dispatch(
270                    Set<MessageListener> messageListeners, Message message);
271    
272            protected void doClose(boolean force) {
273                    if (!_threadPoolExecutor.isShutdown() &&
274                            !_threadPoolExecutor.isTerminating()) {
275    
276                            if (!force) {
277                                    _threadPoolExecutor.shutdown();
278                            }
279                            else {
280                                    List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
281    
282                                    if (_log.isInfoEnabled()) {
283                                            _log.info(
284                                                    "The following " + pendingTasks.size() + " tasks " +
285                                                            "were not executed due to shutown: " +
286                                                                    pendingTasks);
287                                    }
288                            }
289                    }
290            }
291    
292            protected void doOpen() {
293                    if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
294                            ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
295    
296                            _threadPoolExecutor = new ThreadPoolExecutor(
297                                    _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
298                                    new LinkedBlockingQueue<Runnable>(_maximumQueueSize),
299                                    new NamedThreadFactory(
300                                            getName(), Thread.NORM_PRIORITY, classLoader));
301    
302                            if (_rejectedExecutionHandler == null) {
303                                    _rejectedExecutionHandler = createRejectionExecutionHandler();
304                            }
305    
306                            _threadPoolExecutor.setRejectedExecutionHandler(
307                                    _rejectedExecutionHandler);
308                    }
309            }
310    
311            protected void fireMessageListenerRegisteredEvent(
312                    MessageListener messageListener) {
313    
314                    for (DestinationEventListener destinationEventListener :
315                                    _destinationEventListeners) {
316    
317                            destinationEventListener.messageListenerRegistered(
318                                    getName(), messageListener);
319                    }
320            }
321    
322            protected void fireMessageListenerUnregisteredEvent(
323                    MessageListener messageListener) {
324    
325                    for (DestinationEventListener listener : _destinationEventListeners) {
326                            listener.messageListenerUnregistered(getName(), messageListener);
327                    }
328            }
329    
330            protected ThreadPoolExecutor getThreadPoolExecutor() {
331                    return _threadPoolExecutor;
332            }
333    
334            protected boolean registerMessageListener(
335                    InvokerMessageListener invokerMessageListener) {
336    
337                    boolean registered = _messageListeners.add(invokerMessageListener);
338    
339                    if (registered) {
340                            fireMessageListenerRegisteredEvent(
341                                    invokerMessageListener.getMessageListener());
342                    }
343    
344                    return registered;
345            }
346    
347            protected boolean unregisterMessageListener(
348                    InvokerMessageListener invokerMessageListener) {
349    
350                    boolean unregistered = _messageListeners.remove(invokerMessageListener);
351    
352                    if (unregistered) {
353                            fireMessageListenerUnregisteredEvent(
354                                    invokerMessageListener.getMessageListener());
355                    }
356    
357                    return unregistered;
358            }
359    
360            private static final int _WORKERS_CORE_SIZE = 2;
361    
362            private static final int _WORKERS_MAX_SIZE = 5;
363    
364            private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
365    
366            private Set<DestinationEventListener> _destinationEventListeners =
367                    new ConcurrentHashSet<DestinationEventListener>();
368            private int _maximumQueueSize = Integer.MAX_VALUE;
369            private Set<MessageListener> _messageListeners =
370                    new ConcurrentHashSet<MessageListener>();
371            private String _name = StringPool.BLANK;
372            private RejectedExecutionHandler _rejectedExecutionHandler;
373            private ThreadPoolExecutor _threadPoolExecutor;
374            private int _workersCoreSize = _WORKERS_CORE_SIZE;
375            private int _workersMaxSize = _WORKERS_MAX_SIZE;
376    
377    }