1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * This library is free software; you can redistribute it and/or modify it under
5    * the terms of the GNU Lesser General Public License as published by the Free
6    * Software Foundation; either version 2.1 of the License, or (at your option)
7    * any later version.
8    *
9    * This library is distributed in the hope that it will be useful, but WITHOUT
10   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11   * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12   * details.
13   */
14  
15  package com.liferay.portal.kernel.messaging;
16  
17  import com.liferay.portal.kernel.log.Log;
18  import com.liferay.portal.kernel.log.LogFactoryUtil;
19  import com.liferay.portal.kernel.util.ConcurrentHashSet;
20  
21  import java.util.Collection;
22  import java.util.HashMap;
23  import java.util.Map;
24  import java.util.Set;
25  
26  /**
27   * <a href="DefaultMessageBus.java.html"><b><i>View Source</i></b></a>
28   *
29   * @author Michael C. Han
30   */
31  public class DefaultMessageBus implements MessageBus {
32  
33      public synchronized void addDestination(Destination destination) {
34          _destinations.put(destination.getName(), destination);
35  
36          fireDestinationAddedEvent(destination);
37      }
38  
39      public void addDestinationEventListener(
40          DestinationEventListener destinationEventListener) {
41  
42          _destinationEventListeners.add(destinationEventListener);
43      }
44  
45      public void addDestinationEventListener(
46          String destinationName,
47          DestinationEventListener destinationEventListener) {
48  
49          Destination destination = _destinations.get(destinationName);
50  
51          if (destination != null) {
52              destination.addDestinationEventListener(destinationEventListener);
53          }
54      }
55  
56      public void destroy() {
57          shutdown(true);
58      }
59  
60      public Destination getDestination(String destinationName) {
61          return _destinations.get(destinationName);
62      }
63  
64      public int getDestinationCount() {
65          return _destinations.size();
66      }
67  
68      public Collection<String> getDestinationNames() {
69          return _destinations.keySet();
70      }
71  
72      public Collection<Destination> getDestinations() {
73          return _destinations.values();
74      }
75  
76      public boolean hasDestination(String destinationName) {
77          return _destinations.containsKey(destinationName);
78      }
79  
80      public boolean hasMessageListener(String destinationName) {
81          Destination destination = _destinations.get(destinationName);
82  
83          if ((destination != null) && destination.isRegistered()) {
84              return true;
85          }
86          else {
87              return false;
88          }
89      }
90  
91      public synchronized boolean registerMessageListener(
92          String destinationName, MessageListener messageListener) {
93  
94          Destination destination = _destinations.get(destinationName);
95  
96          if (destination == null) {
97              throw new IllegalStateException(
98                  "Destination " + destinationName + " is not configured");
99          }
100 
101         boolean registered = destination.register(messageListener);
102 
103         if (registered) {
104             fireMessageListenerRegisteredEvent(destination, messageListener);
105         }
106 
107         return registered;
108     }
109 
110     public synchronized Destination removeDestination(String destinationName) {
111         Destination destinationModel = _destinations.remove(destinationName);
112 
113         destinationModel.removeDestinationEventListeners();
114         destinationModel.unregisterMessageListeners();
115 
116         fireDestinationRemovedEvent(destinationModel);
117 
118         return destinationModel;
119     }
120 
121     public void removeDestinationEventListener(
122         DestinationEventListener destinationEventListener) {
123 
124         _destinationEventListeners.remove(destinationEventListener);
125     }
126 
127     public void removeDestinationEventListener(
128         String destinationName,
129         DestinationEventListener destinationEventListener) {
130 
131         Destination destination = _destinations.get(destinationName);
132 
133         if (destination != null) {
134             destination.removeDestinationEventListener(
135                 destinationEventListener);
136         }
137     }
138 
139     public void replace(Destination destination) {
140         Destination oldDestination = _destinations.get(destination.getName());
141 
142         oldDestination.copyDestinationEventListeners(destination);
143         oldDestination.copyMessageListeners(destination);
144 
145         removeDestination(oldDestination.getName());
146 
147         addDestination(destination);
148     }
149 
150     public void sendMessage(String destinationName, Message message) {
151         Destination destination = _destinations.get(destinationName);
152 
153         if (destination == null) {
154             if (_log.isWarnEnabled()) {
155                 _log.warn(
156                     "Destination " + destinationName + " is not configured");
157             }
158 
159             return;
160         }
161 
162         message.setDestinationName(destinationName);
163 
164         destination.send(message);
165     }
166 
167     public void shutdown() {
168         shutdown(false);
169     }
170 
171     public synchronized void shutdown(boolean force) {
172         for (Destination destination : _destinations.values()) {
173             destination.close(force);
174         }
175     }
176 
177     public synchronized boolean unregisterMessageListener(
178         String destinationName, MessageListener messageListener) {
179 
180         Destination destination = _destinations.get(destinationName);
181 
182         if (destination == null) {
183             return false;
184         }
185 
186         boolean unregistered = destination.unregister(messageListener);
187 
188         if (unregistered) {
189             fireMessageListenerUnregisteredEvent(destination, messageListener);
190         }
191 
192         return unregistered;
193     }
194 
195     protected void fireDestinationAddedEvent(Destination destination) {
196         for (DestinationEventListener listener : _destinationEventListeners) {
197             listener.destinationAdded(destination);
198         }
199     }
200 
201     protected void fireDestinationRemovedEvent(Destination destination) {
202         for (DestinationEventListener listener : _destinationEventListeners) {
203             listener.destinationRemoved(destination);
204         }
205     }
206 
207     protected void fireMessageListenerRegisteredEvent(
208         Destination destination, MessageListener messageListener) {
209 
210         for (DestinationEventListener destinationEventListener :
211                 _destinationEventListeners) {
212 
213             destinationEventListener.messageListenerRegistered(
214                 destination.getName(), messageListener);
215         }
216     }
217 
218     protected void fireMessageListenerUnregisteredEvent(
219         Destination destination, MessageListener messageListener) {
220 
221         for (DestinationEventListener destinationEventListener :
222                 _destinationEventListeners) {
223 
224             destinationEventListener.messageListenerUnregistered(
225                 destination.getName(), messageListener);
226         }
227     }
228 
229     private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
230 
231     private Set<DestinationEventListener> _destinationEventListeners =
232         new ConcurrentHashSet<DestinationEventListener>();
233     private Map<String, Destination> _destinations =
234         new HashMap<String, Destination>();
235 
236 }