1
19
20 package com.liferay.portal.kernel.messaging;
21
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.ConcurrentHashSet;
25
26 import java.util.Set;
27 import java.util.concurrent.ThreadPoolExecutor;
28
29
36 public abstract class ArrayDispatcherDestination extends BaseDestination {
37
38 public ArrayDispatcherDestination(String name) {
39 super(name);
40 }
41
42 public ArrayDispatcherDestination(
43 String name, int workersCoreSize, int workersMaxSize) {
44
45 super(name, workersCoreSize, workersMaxSize);
46 }
47
48 public int getListenerCount() {
49 return _listeners.size();
50 }
51
52 public void register(MessageListener listener) {
53 listener = new InvokerMessageListener(listener);
54
55 _listeners.add(listener);
56 }
57
58 public void send(Message message) {
59 if (_listeners.isEmpty()) {
60 if (_log.isDebugEnabled()) {
61 _log.debug("No listeners for destination " + getName());
62 }
63
64 return;
65 }
66
67 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
68
69 if (threadPoolExecutor.isShutdown()) {
70 throw new IllegalStateException(
71 "Destination " + getName() + " is shutdown and cannot " +
72 "receive more messages");
73 }
74
75 dispatch(_listeners, message);
76 }
77
78 public boolean unregister(MessageListener listener) {
79 listener = new InvokerMessageListener(listener);
80
81 return _listeners.remove(listener);
82 }
83
84 protected abstract void dispatch(
85 Set<MessageListener> listeners, Message message);
86
87 private static Log _log =
88 LogFactoryUtil.getLog(ArrayDispatcherDestination.class);
89
90 private Set<MessageListener> _listeners =
91 new ConcurrentHashSet<MessageListener>();
92
93 }