1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ListUtil;
28
29 import java.util.HashSet;
30 import java.util.List;
31 import java.util.Set;
32 import java.util.concurrent.ThreadPoolExecutor;
33
34
41 public abstract class ArrayDispatcherDestination extends BaseDestination {
42
43 public ArrayDispatcherDestination(String name) {
44 super(name);
45 }
46
47 public ArrayDispatcherDestination(
48 String name, int workersCoreSize, int workersMaxSize) {
49
50 super(name, workersCoreSize, workersMaxSize);
51 }
52
53 public synchronized void register(MessageListener listener) {
54 listener = new InvokerMessageListener(listener);
55
56 Set<MessageListener> listeners = new HashSet<MessageListener>(
57 ListUtil.fromArray(_listeners));
58
59 listeners.add(listener);
60
61 _listeners = listeners.toArray(
62 new MessageListener[listeners.size()]);
63 }
64
65 public void send(Object message) {
66 if (_listeners.length == 0) {
67 if (_log.isDebugEnabled()) {
68 _log.debug("No listeners for destination " + getName());
69 }
70
71 return;
72 }
73
74 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
75
76 if (threadPoolExecutor.isShutdown()) {
77 throw new IllegalStateException(
78 "Destination " + getName() + " is shutdown and cannot " +
79 "receive more messages");
80 }
81
82 dispatch(_listeners, message);
83 }
84
85 public void send(String message) {
86 if (_listeners.length == 0) {
87 if (_log.isDebugEnabled()) {
88 _log.debug("No listeners for destination " + getName());
89 }
90
91 return;
92 }
93
94 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
95
96 if (threadPoolExecutor.isShutdown()) {
97 throw new IllegalStateException(
98 "Destination " + getName() + " is shutdown and cannot " +
99 "receive more messages");
100 }
101
102 dispatch(_listeners, message);
103 }
104
105 public synchronized boolean unregister(MessageListener listener) {
106 listener = new InvokerMessageListener(listener);
107
108 List<MessageListener> listeners = ListUtil.fromArray(_listeners);
109
110 boolean value = listeners.remove(listener);
111
112 if (value) {
113 _listeners = listeners.toArray(
114 new MessageListener[listeners.size()]);
115 }
116
117 return value;
118 }
119
120 protected abstract void dispatch(
121 MessageListener[] listeners, Object message);
122
123 protected abstract void dispatch(
124 MessageListener[] listeners, String message);
125
126 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
127
128 private MessageListener[] _listeners = new MessageListener[0];
129
130 }