1
19
20 package com.liferay.portal.kernel.messaging;
21
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.HashMap;
28 import java.util.List;
29 import java.util.Map;
30
31
37 public class DefaultMessageBus implements MessageBus {
38
39 public synchronized void addDestination(Destination destination) {
40 _destinations.put(destination.getName(), destination);
41 fireDestinationAddedEvent(destination);
42 }
43
44 public void addDestinationEventListener(DestinationEventListener listener) {
45 _destinationEventListeners.add(listener);
46 }
47
48 public void destroy() {
49 shutdown(true);
50 }
51
52 public int getDestinationCount() {
53 return _destinations.size();
54 }
55
56 public Collection<String> getDestinationNames() {
57 return _destinations.keySet();
58 }
59
60 public Collection<Destination> getDestinations() {
61 return _destinations.values();
62 }
63
64 public boolean hasDestination(String destinationName) {
65 return _destinations.containsKey(destinationName);
66 }
67
68 public boolean hasMessageListener(String destination) {
69 Destination destinationModel = _destinations.get(destination);
70
71 if ((destinationModel != null) && destinationModel.isRegistered()) {
72 return true;
73 }
74 else {
75 return false;
76 }
77 }
78
79 public synchronized void registerMessageListener(
80 String destination, MessageListener listener) {
81
82 Destination destinationModel = _destinations.get(destination);
83
84 if (destinationModel == null) {
85 throw new IllegalStateException(
86 "Destination " + destination + " is not configured");
87 }
88
89 destinationModel.register(listener);
90 }
91
92 public synchronized void removeDestination(String destination) {
93 Destination destinationModel = _destinations.remove(destination);
94
95 fireDestinationRemovedEvent(destinationModel);
96 }
97
98 public void removeDestinationEventListener(
99 DestinationEventListener listener) {
100
101 _destinationEventListeners.remove(listener);
102 }
103
104 public void sendMessage(String destination, Message message) {
105 Destination destinationModel = _destinations.get(destination);
106
107 if (destinationModel == null) {
108 if (_log.isWarnEnabled()) {
109 _log.warn("Destination " + destination + " is not configured");
110 }
111
112 return;
113 }
114
115 message.setDestination(destination);
116
117 destinationModel.send(message);
118 }
119
120 public void shutdown() {
121 shutdown(false);
122 }
123
124 public synchronized void shutdown(boolean force) {
125 for (Destination destination : _destinations.values()) {
126 destination.close(force);
127 }
128 }
129
130 public synchronized boolean unregisterMessageListener(
131 String destination, MessageListener listener) {
132
133 Destination destinationModel = _destinations.get(destination);
134
135 if (destinationModel == null) {
136 return false;
137 }
138
139 return destinationModel.unregister(listener);
140 }
141
142 protected void fireDestinationAddedEvent(Destination destination) {
143 for (DestinationEventListener listener : _destinationEventListeners) {
144 listener.destinationAdded(destination);
145 }
146 }
147
148 protected void fireDestinationRemovedEvent(Destination destination) {
149 for (DestinationEventListener listener : _destinationEventListeners) {
150 listener.destinationRemoved(destination);
151 }
152 }
153
154 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
155
156 private List<DestinationEventListener> _destinationEventListeners =
157 new ArrayList<DestinationEventListener>();
158 private Map<String, Destination> _destinations =
159 new HashMap<String, Destination>();
160
161 }