1
14
15 package com.liferay.portal.cluster;
16
17 import com.liferay.portal.SystemException;
18 import com.liferay.portal.kernel.cluster.Address;
19 import com.liferay.portal.kernel.cluster.ClusterEvent;
20 import com.liferay.portal.kernel.cluster.ClusterEventListener;
21 import com.liferay.portal.kernel.cluster.ClusterException;
22 import com.liferay.portal.kernel.cluster.ClusterExecutor;
23 import com.liferay.portal.kernel.cluster.ClusterMessageType;
24 import com.liferay.portal.kernel.cluster.ClusterNode;
25 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
26 import com.liferay.portal.kernel.cluster.ClusterRequest;
27 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
28 import com.liferay.portal.kernel.log.Log;
29 import com.liferay.portal.kernel.log.LogFactoryUtil;
30 import com.liferay.portal.kernel.util.InetAddressUtil;
31 import com.liferay.portal.kernel.util.MethodHandler;
32 import com.liferay.portal.kernel.util.PropsKeys;
33 import com.liferay.portal.kernel.util.PropsUtil;
34 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
35 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
36 import com.liferay.portal.util.PortalPortEventListener;
37 import com.liferay.portal.util.PortalUtil;
38 import com.liferay.portal.util.PropsValues;
39
40 import java.io.Serializable;
41
42 import java.net.InetAddress;
43
44 import java.util.ArrayList;
45 import java.util.Collection;
46 import java.util.Collections;
47 import java.util.List;
48 import java.util.Map;
49 import java.util.Properties;
50 import java.util.concurrent.ConcurrentHashMap;
51 import java.util.concurrent.CopyOnWriteArrayList;
52
53 import org.jgroups.ChannelException;
54 import org.jgroups.JChannel;
55
56
62 public class ClusterExecutorImpl
63 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
64
65 public void addClusterEventListener(
66 ClusterEventListener clusterEventListener) {
67 if (!isEnabled()) {
68 return;
69 }
70
71 _clusterEventListeners.addIfAbsent(clusterEventListener);
72 }
73
74 public void afterPropertiesSet() {
75 super.afterPropertiesSet();
76
77 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
78 addClusterEventListener(new DebuggingClusterEventListenerImpl());
79 }
80 }
81
82 public void destroy() {
83 if (!isEnabled()) {
84 return;
85 }
86
87 _controlChannel.close();
88 }
89
90 public FutureClusterResponses execute(ClusterRequest clusterRequest)
91 throws SystemException {
92
93 if (!isEnabled()) {
94 return null;
95 }
96
97 List<Address> addresses = prepareAddresses(clusterRequest);
98
99 FutureClusterResponses futureClusterResponses =
100 new FutureClusterResponses(addresses);
101
102 if (!clusterRequest.isFireAndForget()) {
103 String uuid = clusterRequest.getUuid();
104
105 _executionResultMap.put(uuid, futureClusterResponses);
106 }
107
108 if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
109 addresses.remove(getLocalControlAddress())) {
110
111 ClusterNodeResponse clusterNodeResponse = runLocalMethod(
112 clusterRequest.getMethodHandler());
113
114 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
115 clusterNodeResponse.setUuid(clusterRequest.getUuid());
116
117 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
118 }
119
120 if (clusterRequest.isMulticast()) {
121 sendMulticastRequest(clusterRequest);
122 }
123 else {
124 sendUnicastRequest(clusterRequest, addresses);
125 }
126
127 return futureClusterResponses;
128 }
129
130 public List<ClusterEventListener> getClusterEventListeners() {
131 if (!isEnabled()) {
132 return Collections.EMPTY_LIST;
133 }
134
135 return Collections.unmodifiableList(_clusterEventListeners);
136 }
137
138 public List<ClusterNode> getClusterNodes() {
139 if (!isEnabled()) {
140 return Collections.EMPTY_LIST;
141 }
142
143 return new ArrayList<ClusterNode>(_addressMap.values());
144 }
145
146 public ClusterNode getLocalClusterNode() throws SystemException {
147 if (!isEnabled()) {
148 return null;
149 }
150
151 ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
152
153 if (clusterNode == null) {
154 _localClusterNodeId = PortalUUIDUtil.generate();
155
156 clusterNode = new ClusterNode(_localClusterNodeId);
157
158 clusterNode.setPort(PortalUtil.getPortalPort());
159
160 try {
161 InetAddress inetAddress = bindInetAddress;
162
163 if (inetAddress == null) {
164 inetAddress = InetAddressUtil.getLocalInetAddress();
165 }
166
167 clusterNode.setInetAddress(inetAddress);
168
169 clusterNode.setHostName(inetAddress.getHostName());
170 }
171 catch (Exception e) {
172 throw new SystemException(
173 "Unable to determine local network address", e);
174 }
175 }
176
177 return clusterNode;
178 }
179
180 public void initialize() {
181 if (!isEnabled()) {
182 return;
183 }
184
185 try {
186 PortalUtil.addPortalPortEventListener(this);
187
188 ClusterNode clusterNode = getLocalClusterNode();
189
190 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
191 ClusterMessageType.NOTIFY, clusterNode);
192
193 _controlChannel.send(null, null, clusterRequest);
194 }
195 catch (ChannelException ce) {
196 _log.error("Unable to send multicast message ", ce);
197 }
198 catch (SystemException se) {
199 _log.error("Unable to determine local network address", se);
200 }
201 }
202
203 public boolean isClusterNodeAlive(String clusterNodeId) {
204 if (!isEnabled()) {
205 return false;
206 }
207
208 return _clusterNodeIdMap.containsKey(clusterNodeId);
209 }
210
211 public boolean isEnabled() {
212 return PropsValues.CLUSTER_LINK_ENABLED;
213 }
214
215 public void portalPortConfigured(int port) {
216 if (!isEnabled()) {
217 return;
218 }
219
220 try {
221 ClusterNode clusterNode = getLocalClusterNode();
222
223 clusterNode.setPort(port);
224
225 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
226 ClusterMessageType.UPDATE, clusterNode);
227
228 _controlChannel.send(null, null, clusterRequest);
229 }
230 catch (Exception e) {
231 if (_log.isErrorEnabled()) {
232 _log.error("Unable to determine configure node port", e);
233 }
234 }
235 }
236
237 public void removeClusterEventListener(
238 ClusterEventListener clusterEventListener) {
239
240 if (!isEnabled()) {
241 return;
242 }
243
244 _clusterEventListeners.remove(clusterEventListener);
245 }
246
247 public void setClusterEventListeners(
248 List<ClusterEventListener> clusterEventListeners) {
249
250 if (!isEnabled()) {
251 return;
252 }
253
254 _clusterEventListeners.addAllAbsent(clusterEventListeners);
255 }
256
257 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
258 if (!isEnabled()) {
259 return;
260 }
261
262 _shortcutLocalMethod = shortcutLocalMethod;
263 }
264
265 protected void fireClusterEvent(ClusterEvent clusterEvent) {
266 for (ClusterEventListener listener : _clusterEventListeners) {
267 listener.processClusterEvent(clusterEvent);
268 }
269 }
270
271 protected JChannel getControlChannel() {
272 return _controlChannel;
273 }
274
275 protected FutureClusterResponses getExecutionResults(String uuid) {
276 return _executionResultMap.get(uuid);
277 }
278
279 protected Address getLocalControlAddress() {
280 return new AddressImpl(_controlChannel.getLocalAddress());
281 }
282
283 protected void initChannels() {
284 Properties controlProperties = PropsUtil.getProperties(
285 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
286
287 String controlProperty = controlProperties.getProperty(
288 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
289
290 ClusterRequestReceiver clusterInvokeReceiver =
291 new ClusterRequestReceiver(this);
292
293 try {
294 _controlChannel = createJChannel(
295 controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
296 }
297 catch (ChannelException ce) {
298 _log.error(ce, ce);
299 }
300 catch (Exception e) {
301 _log.error(e, e);
302 }
303 }
304
305 protected boolean isShortcutLocalMethod() {
306 return _shortcutLocalMethod;
307 }
308
309 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
310 _addressMap.put(joinAddress, clusterNode);
311
312 Address previousAddress = _clusterNodeIdMap.put(
313 clusterNode.getClusterNodeId(), joinAddress);
314
315 if ((previousAddress == null) &&
316 !getLocalControlAddress().equals(joinAddress)) {
317
318 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
319
320 fireClusterEvent(clusterEvent);
321 }
322 }
323
324 protected void memberRemoved(List<Address> departAddresses) {
325 List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
326
327 for (Address departAddress : departAddresses) {
328 ClusterNode departingClusterNode = _addressMap.remove(
329 departAddress);
330 if (departingClusterNode != null) {
331 departingClusterNodes.add(departingClusterNode);
332
333 _clusterNodeIdMap.remove(
334 departingClusterNode.getClusterNodeId());
335 }
336 }
337
338 if (departingClusterNodes.isEmpty()) {
339 return;
340 }
341
342 ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
343
344 fireClusterEvent(clusterEvent);
345 }
346
347 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
348 boolean isMulticast = clusterRequest.isMulticast();
349
350 List<Address> addresses = null;
351
352 if (isMulticast) {
353 addresses = getAddresses(_controlChannel);
354 }
355 else {
356 Collection<String> clusterNodeIds =
357 clusterRequest.getTargetClusterNodeIds();
358
359 addresses = new ArrayList<Address>(clusterNodeIds.size());
360
361 for (String clusterNodeId : clusterNodeIds) {
362 Address address = _clusterNodeIdMap.get(clusterNodeId);
363
364 addresses.add(address);
365 }
366 }
367
368 return addresses;
369 }
370
371 protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler)
372 throws SystemException {
373
374 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
375
376 ClusterNode localClusterNode = getLocalClusterNode();
377
378 clusterNodeResponse.setClusterNode(localClusterNode);
379 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
380
381 if (methodHandler == null) {
382 clusterNodeResponse.setException(
383 new ClusterException(
384 "Payload is not of type " + MethodHandler.class.getName()));
385
386 return clusterNodeResponse;
387 }
388
389 try {
390 Object returnValue = methodHandler.invoke(true);
391
392 if (returnValue instanceof Serializable) {
393 clusterNodeResponse.setResult(returnValue);
394 }
395 else if (returnValue != null) {
396 clusterNodeResponse.setException(
397 new ClusterException("Return value is not serializable"));
398 }
399 }
400 catch (Exception e) {
401 clusterNodeResponse.setException(e);
402 }
403
404 return clusterNodeResponse;
405 }
406
407 protected void sendMulticastRequest(ClusterRequest clusterRequest)
408 throws SystemException {
409
410 try {
411 _controlChannel.send(null, null, clusterRequest);
412 }
413 catch (ChannelException ce) {
414 _log.error(
415 "Unable to send multicast message " + clusterRequest, ce);
416
417 throw new SystemException(
418 "Unable to send multicast request", ce);
419 }
420 }
421
422 protected void sendUnicastRequest(
423 ClusterRequest clusterRequest, List<Address> addresses)
424 throws SystemException {
425
426 for (Address address : addresses) {
427 org.jgroups.Address jGroupsAddress =
428 (org.jgroups.Address)address.getRealAddress();
429
430 try {
431 _controlChannel.send(jGroupsAddress, null, clusterRequest);
432 }
433 catch (ChannelException ce) {
434 _log.error(
435 "Unable to send unicast message " + clusterRequest, ce);
436
437 throw new SystemException(
438 "Unable to send unicast request", ce);
439 }
440 }
441 }
442
443 private static final String _DEFAULT_CLUSTER_NAME =
444 "LIFERAY-CONTROL-CHANNEL";
445
446 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
447
448 private Map<Address, ClusterNode> _addressMap =
449 new ConcurrentHashMap<Address, ClusterNode>();
450 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
451 new CopyOnWriteArrayList<ClusterEventListener>();
452 private Map<String, Address> _clusterNodeIdMap =
453 new ConcurrentHashMap<String, Address>();
454 private JChannel _controlChannel;
455 private Map<String, FutureClusterResponses> _executionResultMap =
456 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
457 private String _localClusterNodeId;
458 private boolean _shortcutLocalMethod;
459
460 }