001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.util.ConcurrentHashSet;
020    
021    import java.util.Collection;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.Set;
025    
026    /**
027     * @author Michael C. Han
028     */
029    public class DefaultMessageBus implements MessageBus {
030    
031            public synchronized void addDestination(Destination destination) {
032                    _destinations.put(destination.getName(), destination);
033    
034                    fireDestinationAddedEvent(destination);
035            }
036    
037            public void addDestinationEventListener(
038                    DestinationEventListener destinationEventListener) {
039    
040                    _destinationEventListeners.add(destinationEventListener);
041            }
042    
043            public void addDestinationEventListener(
044                    String destinationName,
045                    DestinationEventListener destinationEventListener) {
046    
047                    Destination destination = _destinations.get(destinationName);
048    
049                    if (destination != null) {
050                            destination.addDestinationEventListener(destinationEventListener);
051                    }
052            }
053    
054            public void destroy() {
055                    shutdown(true);
056            }
057    
058            public Destination getDestination(String destinationName) {
059                    return _destinations.get(destinationName);
060            }
061    
062            public int getDestinationCount() {
063                    return _destinations.size();
064            }
065    
066            public Collection<String> getDestinationNames() {
067                    return _destinations.keySet();
068            }
069    
070            public Collection<Destination> getDestinations() {
071                    return _destinations.values();
072            }
073    
074            public boolean hasDestination(String destinationName) {
075                    return _destinations.containsKey(destinationName);
076            }
077    
078            public boolean hasMessageListener(String destinationName) {
079                    Destination destination = _destinations.get(destinationName);
080    
081                    if ((destination != null) && destination.isRegistered()) {
082                            return true;
083                    }
084                    else {
085                            return false;
086                    }
087            }
088    
089            public synchronized boolean registerMessageListener(
090                    String destinationName, MessageListener messageListener) {
091    
092                    Destination destination = _destinations.get(destinationName);
093    
094                    if (destination == null) {
095                            throw new IllegalStateException(
096                                    "Destination " + destinationName + " is not configured");
097                    }
098    
099                    boolean registered = destination.register(messageListener);
100    
101                    if (registered) {
102                            fireMessageListenerRegisteredEvent(destination, messageListener);
103                    }
104    
105                    return registered;
106            }
107    
108            public synchronized Destination removeDestination(String destinationName) {
109                    Destination destinationModel = _destinations.remove(destinationName);
110    
111                    destinationModel.removeDestinationEventListeners();
112                    destinationModel.unregisterMessageListeners();
113    
114                    fireDestinationRemovedEvent(destinationModel);
115    
116                    return destinationModel;
117            }
118    
119            public void removeDestinationEventListener(
120                    DestinationEventListener destinationEventListener) {
121    
122                    _destinationEventListeners.remove(destinationEventListener);
123            }
124    
125            public void removeDestinationEventListener(
126                    String destinationName,
127                    DestinationEventListener destinationEventListener) {
128    
129                    Destination destination = _destinations.get(destinationName);
130    
131                    if (destination != null) {
132                            destination.removeDestinationEventListener(
133                                    destinationEventListener);
134                    }
135            }
136    
137            public void replace(Destination destination) {
138                    Destination oldDestination = _destinations.get(destination.getName());
139    
140                    oldDestination.copyDestinationEventListeners(destination);
141                    oldDestination.copyMessageListeners(destination);
142    
143                    removeDestination(oldDestination.getName());
144    
145                    addDestination(destination);
146            }
147    
148            public void sendMessage(String destinationName, Message message) {
149                    Destination destination = _destinations.get(destinationName);
150    
151                    if (destination == null) {
152                            if (_log.isWarnEnabled()) {
153                                    _log.warn(
154                                            "Destination " + destinationName + " is not configured");
155                            }
156    
157                            return;
158                    }
159    
160                    message.setDestinationName(destinationName);
161    
162                    destination.send(message);
163            }
164    
165            public void shutdown() {
166                    shutdown(false);
167            }
168    
169            public synchronized void shutdown(boolean force) {
170                    for (Destination destination : _destinations.values()) {
171                            destination.close(force);
172                    }
173            }
174    
175            public synchronized boolean unregisterMessageListener(
176                    String destinationName, MessageListener messageListener) {
177    
178                    Destination destination = _destinations.get(destinationName);
179    
180                    if (destination == null) {
181                            return false;
182                    }
183    
184                    boolean unregistered = destination.unregister(messageListener);
185    
186                    if (unregistered) {
187                            fireMessageListenerUnregisteredEvent(destination, messageListener);
188                    }
189    
190                    return unregistered;
191            }
192    
193            protected void fireDestinationAddedEvent(Destination destination) {
194                    for (DestinationEventListener listener : _destinationEventListeners) {
195                            listener.destinationAdded(destination);
196                    }
197            }
198    
199            protected void fireDestinationRemovedEvent(Destination destination) {
200                    for (DestinationEventListener listener : _destinationEventListeners) {
201                            listener.destinationRemoved(destination);
202                    }
203            }
204    
205            protected void fireMessageListenerRegisteredEvent(
206                    Destination destination, MessageListener messageListener) {
207    
208                    for (DestinationEventListener destinationEventListener :
209                                    _destinationEventListeners) {
210    
211                            destinationEventListener.messageListenerRegistered(
212                                    destination.getName(), messageListener);
213                    }
214            }
215    
216            protected void fireMessageListenerUnregisteredEvent(
217                    Destination destination, MessageListener messageListener) {
218    
219                    for (DestinationEventListener destinationEventListener :
220                                    _destinationEventListeners) {
221    
222                            destinationEventListener.messageListenerUnregistered(
223                                    destination.getName(), messageListener);
224                    }
225            }
226    
227            private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
228    
229            private Set<DestinationEventListener> _destinationEventListeners =
230                    new ConcurrentHashSet<DestinationEventListener>();
231            private Map<String, Destination> _destinations =
232                    new HashMap<String, Destination>();
233    
234    }