1
19
20 package com.liferay.portal.kernel.messaging;
21
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.ConcurrentHashSet;
25
26 import java.util.Iterator;
27 import java.util.Set;
28 import java.util.concurrent.ThreadPoolExecutor;
29
30
38 public abstract class IteratorDispatcherDestination extends BaseDestination {
39
40 public IteratorDispatcherDestination(String name) {
41 super(name);
42 }
43
44 public IteratorDispatcherDestination(
45 String name, int workersCoreSize, int workersMaxSize) {
46
47 super(name, workersCoreSize, workersMaxSize);
48 }
49
50 public void register(MessageListener listener) {
51 listener = new InvokerMessageListener(listener);
52
53 _listeners.add(listener);
54 }
55
56 public void send(Message message) {
57 if (_listeners.size() == 0) {
58 if (_log.isDebugEnabled()) {
59 _log.debug("No listeners for destination " + getName());
60 }
61
62 return;
63 }
64
65 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
66
67 if (threadPoolExecutor.isShutdown()) {
68 throw new IllegalStateException(
69 "Destination " + getName() + " is shutdown and cannot " +
70 "receive more messages");
71 }
72
73 dispatch(_listeners.iterator(), message);
74 }
75
76 public boolean unregister(MessageListener listener) {
77 listener = new InvokerMessageListener(listener);
78
79 return _listeners.remove(listener);
80 }
81
82 public int getListenerCount() {
83 return _listeners.size();
84 }
85
86 protected abstract void dispatch(
87 Iterator<MessageListener> listenersItr, Message message);
88
89 private static Log _log =
90 LogFactoryUtil.getLog(IteratorDispatcherDestination.class);
91
92 private Set<MessageListener> _listeners =
93 new ConcurrentHashSet<MessageListener>();
94
95 }