001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.util.ConcurrentHashSet;
020
021 import java.util.Collection;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.Set;
025
026
029 public class DefaultMessageBus implements MessageBus {
030
031 public synchronized void addDestination(Destination destination) {
032 _destinations.put(destination.getName(), destination);
033
034 fireDestinationAddedEvent(destination);
035 }
036
037 public void addDestinationEventListener(
038 DestinationEventListener destinationEventListener) {
039
040 _destinationEventListeners.add(destinationEventListener);
041 }
042
043 public void addDestinationEventListener(
044 String destinationName,
045 DestinationEventListener destinationEventListener) {
046
047 Destination destination = _destinations.get(destinationName);
048
049 if (destination != null) {
050 destination.addDestinationEventListener(destinationEventListener);
051 }
052 }
053
054 public void destroy() {
055 shutdown(true);
056 }
057
058 public Destination getDestination(String destinationName) {
059 return _destinations.get(destinationName);
060 }
061
062 public int getDestinationCount() {
063 return _destinations.size();
064 }
065
066 public Collection<String> getDestinationNames() {
067 return _destinations.keySet();
068 }
069
070 public Collection<Destination> getDestinations() {
071 return _destinations.values();
072 }
073
074 public boolean hasDestination(String destinationName) {
075 return _destinations.containsKey(destinationName);
076 }
077
078 public boolean hasMessageListener(String destinationName) {
079 Destination destination = _destinations.get(destinationName);
080
081 if ((destination != null) && destination.isRegistered()) {
082 return true;
083 }
084 else {
085 return false;
086 }
087 }
088
089 public synchronized boolean registerMessageListener(
090 String destinationName, MessageListener messageListener) {
091
092 Destination destination = _destinations.get(destinationName);
093
094 if (destination == null) {
095 throw new IllegalStateException(
096 "Destination " + destinationName + " is not configured");
097 }
098
099 boolean registered = destination.register(messageListener);
100
101 if (registered) {
102 fireMessageListenerRegisteredEvent(destination, messageListener);
103 }
104
105 return registered;
106 }
107
108 public synchronized Destination removeDestination(String destinationName) {
109 Destination destinationModel = _destinations.remove(destinationName);
110
111 destinationModel.removeDestinationEventListeners();
112 destinationModel.unregisterMessageListeners();
113
114 fireDestinationRemovedEvent(destinationModel);
115
116 return destinationModel;
117 }
118
119 public void removeDestinationEventListener(
120 DestinationEventListener destinationEventListener) {
121
122 _destinationEventListeners.remove(destinationEventListener);
123 }
124
125 public void removeDestinationEventListener(
126 String destinationName,
127 DestinationEventListener destinationEventListener) {
128
129 Destination destination = _destinations.get(destinationName);
130
131 if (destination != null) {
132 destination.removeDestinationEventListener(
133 destinationEventListener);
134 }
135 }
136
137 public void replace(Destination destination) {
138 Destination oldDestination = _destinations.get(destination.getName());
139
140 oldDestination.copyDestinationEventListeners(destination);
141 oldDestination.copyMessageListeners(destination);
142
143 removeDestination(oldDestination.getName());
144
145 addDestination(destination);
146 }
147
148 public void sendMessage(String destinationName, Message message) {
149 Destination destination = _destinations.get(destinationName);
150
151 if (destination == null) {
152 if (_log.isWarnEnabled()) {
153 _log.warn(
154 "Destination " + destinationName + " is not configured");
155 }
156
157 return;
158 }
159
160 message.setDestinationName(destinationName);
161
162 destination.send(message);
163 }
164
165 public void shutdown() {
166 shutdown(false);
167 }
168
169 public synchronized void shutdown(boolean force) {
170 for (Destination destination : _destinations.values()) {
171 destination.close(force);
172 }
173 }
174
175 public synchronized boolean unregisterMessageListener(
176 String destinationName, MessageListener messageListener) {
177
178 Destination destination = _destinations.get(destinationName);
179
180 if (destination == null) {
181 return false;
182 }
183
184 boolean unregistered = destination.unregister(messageListener);
185
186 if (unregistered) {
187 fireMessageListenerUnregisteredEvent(destination, messageListener);
188 }
189
190 return unregistered;
191 }
192
193 protected void fireDestinationAddedEvent(Destination destination) {
194 for (DestinationEventListener listener : _destinationEventListeners) {
195 listener.destinationAdded(destination);
196 }
197 }
198
199 protected void fireDestinationRemovedEvent(Destination destination) {
200 for (DestinationEventListener listener : _destinationEventListeners) {
201 listener.destinationRemoved(destination);
202 }
203 }
204
205 protected void fireMessageListenerRegisteredEvent(
206 Destination destination, MessageListener messageListener) {
207
208 for (DestinationEventListener destinationEventListener :
209 _destinationEventListeners) {
210
211 destinationEventListener.messageListenerRegistered(
212 destination.getName(), messageListener);
213 }
214 }
215
216 protected void fireMessageListenerUnregisteredEvent(
217 Destination destination, MessageListener messageListener) {
218
219 for (DestinationEventListener destinationEventListener :
220 _destinationEventListeners) {
221
222 destinationEventListener.messageListenerUnregistered(
223 destination.getName(), messageListener);
224 }
225 }
226
227 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
228
229 private Set<DestinationEventListener> _destinationEventListeners =
230 new ConcurrentHashSet<DestinationEventListener>();
231 private Map<String, Destination> _destinations =
232 new HashMap<String, Destination>();
233
234 }