1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.ConcurrentHashSet;
28
29 import java.util.Collection;
30 import java.util.HashMap;
31 import java.util.Map;
32 import java.util.Set;
33
34
39 public class DefaultMessageBus implements MessageBus {
40
41 public synchronized void addDestination(Destination destination) {
42 _destinations.put(destination.getName(), destination);
43
44 fireDestinationAddedEvent(destination);
45 }
46
47 public void addDestinationEventListener(
48 DestinationEventListener destinationEventListener) {
49
50 _destinationEventListeners.add(destinationEventListener);
51 }
52
53 public void addDestinationEventListener(
54 String destinationName,
55 DestinationEventListener destinationEventListener) {
56
57 Destination destination = _destinations.get(destinationName);
58
59 if (destination != null) {
60 destination.addDestinationEventListener(destinationEventListener);
61 }
62 }
63
64 public void destroy() {
65 shutdown(true);
66 }
67
68 public int getDestinationCount() {
69 return _destinations.size();
70 }
71
72 public Collection<String> getDestinationNames() {
73 return _destinations.keySet();
74 }
75
76 public Collection<Destination> getDestinations() {
77 return _destinations.values();
78 }
79
80 public boolean hasDestination(String destinationName) {
81 return _destinations.containsKey(destinationName);
82 }
83
84 public boolean hasMessageListener(String destinationName) {
85 Destination destination = _destinations.get(destinationName);
86
87 if ((destination != null) && destination.isRegistered()) {
88 return true;
89 }
90 else {
91 return false;
92 }
93 }
94
95 public synchronized boolean registerMessageListener(
96 String destinationName, MessageListener messageListener) {
97
98 Destination destination = _destinations.get(destinationName);
99
100 if (destination == null) {
101 throw new IllegalStateException(
102 "Destination " + destinationName + " is not configured");
103 }
104
105 boolean registered = destination.register(messageListener);
106
107 if (registered) {
108 fireMessageListenerRegisteredEvent(destination, messageListener);
109 }
110
111 return registered;
112 }
113
114 public synchronized Destination removeDestination(String destinationName) {
115 Destination destinationModel = _destinations.remove(destinationName);
116
117 destinationModel.removeDestinationEventListeners();
118 destinationModel.unregisterMessageListeners();
119
120 fireDestinationRemovedEvent(destinationModel);
121
122 return destinationModel;
123 }
124
125 public void removeDestinationEventListener(
126 DestinationEventListener destinationEventListener) {
127
128 _destinationEventListeners.remove(destinationEventListener);
129 }
130
131 public void removeDestinationEventListener(
132 String destinationName,
133 DestinationEventListener destinationEventListener) {
134
135 Destination destination = _destinations.get(destinationName);
136
137 if (destination != null) {
138 destination.removeDestinationEventListener(
139 destinationEventListener);
140 }
141 }
142
143 public void replace(Destination destination) {
144 Destination oldDestination = _destinations.get(destination.getName());
145
146 oldDestination.copyDestinationEventListeners(destination);
147 oldDestination.copyMessageListeners(destination);
148
149 removeDestination(oldDestination.getName());
150
151 addDestination(destination);
152 }
153
154 public void sendMessage(String destinationName, Message message) {
155 Destination destination = _destinations.get(destinationName);
156
157 if (destination == null) {
158 if (_log.isWarnEnabled()) {
159 _log.warn(
160 "Destination " + destinationName + " is not configured");
161 }
162
163 return;
164 }
165
166 message.setDestinationName(destinationName);
167
168 destination.send(message);
169 }
170
171 public void shutdown() {
172 shutdown(false);
173 }
174
175 public synchronized void shutdown(boolean force) {
176 for (Destination destination : _destinations.values()) {
177 destination.close(force);
178 }
179 }
180
181 public synchronized boolean unregisterMessageListener(
182 String destinationName, MessageListener messageListener) {
183
184 Destination destination = _destinations.get(destinationName);
185
186 if (destination == null) {
187 return false;
188 }
189
190 boolean unregistered = destination.unregister(messageListener);
191
192 if (unregistered) {
193 fireMessageListenerUnregisteredEvent(destination, messageListener);
194 }
195
196 return unregistered;
197 }
198
199 protected void fireDestinationAddedEvent(Destination destination) {
200 for (DestinationEventListener listener : _destinationEventListeners) {
201 listener.destinationAdded(destination);
202 }
203 }
204
205 protected void fireDestinationRemovedEvent(Destination destination) {
206 for (DestinationEventListener listener : _destinationEventListeners) {
207 listener.destinationRemoved(destination);
208 }
209 }
210
211 protected void fireMessageListenerRegisteredEvent(
212 Destination destination, MessageListener messageListener) {
213
214 for (DestinationEventListener destinationEventListener :
215 _destinationEventListeners) {
216
217 destinationEventListener.messageListenerRegistered(
218 destination.getName(), messageListener);
219 }
220 }
221
222 protected void fireMessageListenerUnregisteredEvent(
223 Destination destination, MessageListener messageListener) {
224
225 for (DestinationEventListener destinationEventListener :
226 _destinationEventListeners) {
227
228 destinationEventListener.messageListenerUnregistered(
229 destination.getName(), messageListener);
230 }
231 }
232
233 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
234
235 private Set<DestinationEventListener> _destinationEventListeners =
236 new ConcurrentHashSet<DestinationEventListener>();
237 private Map<String, Destination> _destinations =
238 new HashMap<String, Destination>();
239
240 }