1
14
15 package com.liferay.portal.kernel.messaging;
16
17 import com.liferay.portal.kernel.log.Log;
18 import com.liferay.portal.kernel.log.LogFactoryUtil;
19 import com.liferay.portal.kernel.util.ConcurrentHashSet;
20
21 import java.util.Collection;
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Set;
25
26
31 public class DefaultMessageBus implements MessageBus {
32
33 public synchronized void addDestination(Destination destination) {
34 _destinations.put(destination.getName(), destination);
35
36 fireDestinationAddedEvent(destination);
37 }
38
39 public void addDestinationEventListener(
40 DestinationEventListener destinationEventListener) {
41
42 _destinationEventListeners.add(destinationEventListener);
43 }
44
45 public void addDestinationEventListener(
46 String destinationName,
47 DestinationEventListener destinationEventListener) {
48
49 Destination destination = _destinations.get(destinationName);
50
51 if (destination != null) {
52 destination.addDestinationEventListener(destinationEventListener);
53 }
54 }
55
56 public void destroy() {
57 shutdown(true);
58 }
59
60 public Destination getDestination(String destinationName) {
61 return _destinations.get(destinationName);
62 }
63
64 public int getDestinationCount() {
65 return _destinations.size();
66 }
67
68 public Collection<String> getDestinationNames() {
69 return _destinations.keySet();
70 }
71
72 public Collection<Destination> getDestinations() {
73 return _destinations.values();
74 }
75
76 public boolean hasDestination(String destinationName) {
77 return _destinations.containsKey(destinationName);
78 }
79
80 public boolean hasMessageListener(String destinationName) {
81 Destination destination = _destinations.get(destinationName);
82
83 if ((destination != null) && destination.isRegistered()) {
84 return true;
85 }
86 else {
87 return false;
88 }
89 }
90
91 public synchronized boolean registerMessageListener(
92 String destinationName, MessageListener messageListener) {
93
94 Destination destination = _destinations.get(destinationName);
95
96 if (destination == null) {
97 throw new IllegalStateException(
98 "Destination " + destinationName + " is not configured");
99 }
100
101 boolean registered = destination.register(messageListener);
102
103 if (registered) {
104 fireMessageListenerRegisteredEvent(destination, messageListener);
105 }
106
107 return registered;
108 }
109
110 public synchronized Destination removeDestination(String destinationName) {
111 Destination destinationModel = _destinations.remove(destinationName);
112
113 destinationModel.removeDestinationEventListeners();
114 destinationModel.unregisterMessageListeners();
115
116 fireDestinationRemovedEvent(destinationModel);
117
118 return destinationModel;
119 }
120
121 public void removeDestinationEventListener(
122 DestinationEventListener destinationEventListener) {
123
124 _destinationEventListeners.remove(destinationEventListener);
125 }
126
127 public void removeDestinationEventListener(
128 String destinationName,
129 DestinationEventListener destinationEventListener) {
130
131 Destination destination = _destinations.get(destinationName);
132
133 if (destination != null) {
134 destination.removeDestinationEventListener(
135 destinationEventListener);
136 }
137 }
138
139 public void replace(Destination destination) {
140 Destination oldDestination = _destinations.get(destination.getName());
141
142 oldDestination.copyDestinationEventListeners(destination);
143 oldDestination.copyMessageListeners(destination);
144
145 removeDestination(oldDestination.getName());
146
147 addDestination(destination);
148 }
149
150 public void sendMessage(String destinationName, Message message) {
151 Destination destination = _destinations.get(destinationName);
152
153 if (destination == null) {
154 if (_log.isWarnEnabled()) {
155 _log.warn(
156 "Destination " + destinationName + " is not configured");
157 }
158
159 return;
160 }
161
162 message.setDestinationName(destinationName);
163
164 destination.send(message);
165 }
166
167 public void shutdown() {
168 shutdown(false);
169 }
170
171 public synchronized void shutdown(boolean force) {
172 for (Destination destination : _destinations.values()) {
173 destination.close(force);
174 }
175 }
176
177 public synchronized boolean unregisterMessageListener(
178 String destinationName, MessageListener messageListener) {
179
180 Destination destination = _destinations.get(destinationName);
181
182 if (destination == null) {
183 return false;
184 }
185
186 boolean unregistered = destination.unregister(messageListener);
187
188 if (unregistered) {
189 fireMessageListenerUnregisteredEvent(destination, messageListener);
190 }
191
192 return unregistered;
193 }
194
195 protected void fireDestinationAddedEvent(Destination destination) {
196 for (DestinationEventListener listener : _destinationEventListeners) {
197 listener.destinationAdded(destination);
198 }
199 }
200
201 protected void fireDestinationRemovedEvent(Destination destination) {
202 for (DestinationEventListener listener : _destinationEventListeners) {
203 listener.destinationRemoved(destination);
204 }
205 }
206
207 protected void fireMessageListenerRegisteredEvent(
208 Destination destination, MessageListener messageListener) {
209
210 for (DestinationEventListener destinationEventListener :
211 _destinationEventListeners) {
212
213 destinationEventListener.messageListenerRegistered(
214 destination.getName(), messageListener);
215 }
216 }
217
218 protected void fireMessageListenerUnregisteredEvent(
219 Destination destination, MessageListener messageListener) {
220
221 for (DestinationEventListener destinationEventListener :
222 _destinationEventListeners) {
223
224 destinationEventListener.messageListenerUnregistered(
225 destination.getName(), messageListener);
226 }
227 }
228
229 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
230
231 private Set<DestinationEventListener> _destinationEventListeners =
232 new ConcurrentHashSet<DestinationEventListener>();
233 private Map<String, Destination> _destinations =
234 new HashMap<String, Destination>();
235
236 }