1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ListUtil;
28
29 import java.util.Arrays;
30 import java.util.HashSet;
31 import java.util.List;
32 import java.util.Set;
33 import java.util.concurrent.ThreadPoolExecutor;
34
35
42 public abstract class ArrayDispatcherDestination extends BaseDestination {
43
44 public ArrayDispatcherDestination(String name) {
45 super(name);
46 }
47
48 public ArrayDispatcherDestination(
49 String name, int workersCoreSize, int workersMaxSize) {
50
51 super(name, workersCoreSize, workersMaxSize);
52 }
53
54 public synchronized void register(MessageListener listener) {
55 listener = new InvokerMessageListener(listener);
56
57 Set<MessageListener> listeners = new HashSet<MessageListener>(
58 Arrays.asList(_listeners));
59
60 listeners.add(listener);
61
62 _listeners = listeners.toArray(
63 new MessageListener[listeners.size()]);
64
65 setListenersCount(listeners.size());
66 }
67
68 public void send(Message message) {
69 if (_listeners.length == 0) {
70 if (_log.isDebugEnabled()) {
71 _log.debug("No listeners for destination " + getName());
72 }
73
74 return;
75 }
76
77 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
78
79 if (threadPoolExecutor.isShutdown()) {
80 throw new IllegalStateException(
81 "Destination " + getName() + " is shutdown and cannot " +
82 "receive more messages");
83 }
84
85 dispatch(_listeners, message);
86 }
87
88 public synchronized boolean unregister(MessageListener listener) {
89 listener = new InvokerMessageListener(listener);
90
91 List<MessageListener> listeners = ListUtil.fromArray(_listeners);
92
93 boolean value = listeners.remove(listener);
94
95 if (value) {
96 _listeners = listeners.toArray(
97 new MessageListener[listeners.size()]);
98
99 setListenersCount(listeners.size());
100 }
101
102 return value;
103 }
104
105 protected abstract void dispatch(
106 MessageListener[] listeners, Message message);
107
108 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
109
110 private MessageListener[] _listeners = new MessageListener[0];
111
112 }