1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.cluster;
16  
17  import com.liferay.portal.SystemException;
18  import com.liferay.portal.kernel.cluster.Address;
19  import com.liferay.portal.kernel.cluster.ClusterEvent;
20  import com.liferay.portal.kernel.cluster.ClusterEventListener;
21  import com.liferay.portal.kernel.cluster.ClusterException;
22  import com.liferay.portal.kernel.cluster.ClusterExecutor;
23  import com.liferay.portal.kernel.cluster.ClusterMessageType;
24  import com.liferay.portal.kernel.cluster.ClusterNode;
25  import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
26  import com.liferay.portal.kernel.cluster.ClusterRequest;
27  import com.liferay.portal.kernel.cluster.FutureClusterResponses;
28  import com.liferay.portal.kernel.log.Log;
29  import com.liferay.portal.kernel.log.LogFactoryUtil;
30  import com.liferay.portal.kernel.util.InetAddressUtil;
31  import com.liferay.portal.kernel.util.MethodHandler;
32  import com.liferay.portal.kernel.util.PropsKeys;
33  import com.liferay.portal.kernel.util.PropsUtil;
34  import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
35  import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
36  import com.liferay.portal.util.PortalPortEventListener;
37  import com.liferay.portal.util.PortalUtil;
38  import com.liferay.portal.util.PropsValues;
39  
40  import java.io.Serializable;
41  
42  import java.net.InetAddress;
43  
44  import java.util.ArrayList;
45  import java.util.Collection;
46  import java.util.Collections;
47  import java.util.List;
48  import java.util.Map;
49  import java.util.Properties;
50  import java.util.concurrent.ConcurrentHashMap;
51  import java.util.concurrent.CopyOnWriteArrayList;
52  
53  import org.jgroups.ChannelException;
54  import org.jgroups.JChannel;
55  
56  /**
57   * <a href="ClusterExecutorImpl.java.html"><b><i>View Source</i></b></a>
58   *
59   * @author Tina Tian
60   * @author Shuyang Zhou
61   */
62  public class ClusterExecutorImpl
63      extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
64  
65      public void addClusterEventListener(
66          ClusterEventListener clusterEventListener) {
67          if (!isEnabled()) {
68              return;
69          }
70  
71          _clusterEventListeners.addIfAbsent(clusterEventListener);
72      }
73  
74      public void afterPropertiesSet() {
75          super.afterPropertiesSet();
76  
77          if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
78              addClusterEventListener(new DebuggingClusterEventListenerImpl());
79          }
80      }
81  
82      public void destroy() {
83          if (!isEnabled()) {
84              return;
85          }
86  
87          _controlChannel.close();
88      }
89  
90      public FutureClusterResponses execute(ClusterRequest clusterRequest)
91          throws SystemException {
92  
93          if (!isEnabled()) {
94              return null;
95          }
96  
97          List<Address> addresses = prepareAddresses(clusterRequest);
98  
99          FutureClusterResponses futureClusterResponses =
100             new FutureClusterResponses(addresses);
101 
102         if (!clusterRequest.isFireAndForget()) {
103             String uuid = clusterRequest.getUuid();
104 
105             _executionResultMap.put(uuid, futureClusterResponses);
106         }
107 
108         if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
109             addresses.remove(getLocalControlAddress())) {
110 
111             ClusterNodeResponse clusterNodeResponse = runLocalMethod(
112                 clusterRequest.getMethodHandler());
113 
114             clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
115             clusterNodeResponse.setUuid(clusterRequest.getUuid());
116 
117             futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
118         }
119 
120         if (clusterRequest.isMulticast()) {
121             sendMulticastRequest(clusterRequest);
122         }
123         else {
124             sendUnicastRequest(clusterRequest, addresses);
125         }
126 
127         return futureClusterResponses;
128     }
129 
130     public List<ClusterEventListener> getClusterEventListeners() {
131         if (!isEnabled()) {
132             return Collections.EMPTY_LIST;
133         }
134 
135         return Collections.unmodifiableList(_clusterEventListeners);
136     }
137 
138     public List<ClusterNode> getClusterNodes() {
139         if (!isEnabled()) {
140             return Collections.EMPTY_LIST;
141         }
142 
143         return new ArrayList<ClusterNode>(_addressMap.values());
144     }
145 
146     public ClusterNode getLocalClusterNode() throws SystemException {
147         if (!isEnabled()) {
148             return null;
149         }
150 
151         ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
152 
153         if (clusterNode == null) {
154             _localClusterNodeId = PortalUUIDUtil.generate();
155 
156             clusterNode = new ClusterNode(_localClusterNodeId);
157 
158             clusterNode.setPort(PortalUtil.getPortalPort());
159 
160             try {
161                 InetAddress inetAddress = bindInetAddress;
162 
163                 if (inetAddress == null) {
164                     inetAddress = InetAddressUtil.getLocalInetAddress();
165                 }
166 
167                 clusterNode.setInetAddress(inetAddress);
168 
169                 clusterNode.setHostName(inetAddress.getHostName());
170             }
171             catch (Exception e) {
172                 throw new SystemException(
173                     "Unable to determine local network address", e);
174             }
175         }
176 
177         return clusterNode;
178     }
179 
180     public void initialize() {
181         if (!isEnabled()) {
182             return;
183         }
184 
185         try {
186             PortalUtil.addPortalPortEventListener(this);
187 
188             ClusterNode clusterNode = getLocalClusterNode();
189 
190             ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
191                 ClusterMessageType.NOTIFY, clusterNode);
192 
193             _controlChannel.send(null, null, clusterRequest);
194         }
195         catch (ChannelException ce) {
196             _log.error("Unable to send multicast message ", ce);
197         }
198         catch (SystemException se) {
199             _log.error("Unable to determine local network address", se);
200         }
201     }
202 
203     public boolean isClusterNodeAlive(String clusterNodeId) {
204         if (!isEnabled()) {
205             return false;
206         }
207 
208         return _clusterNodeIdMap.containsKey(clusterNodeId);
209     }
210 
211     public boolean isEnabled() {
212         return PropsValues.CLUSTER_LINK_ENABLED;
213     }
214 
215     public void portalPortConfigured(int port) {
216         if (!isEnabled()) {
217             return;
218         }
219 
220         try {
221             ClusterNode clusterNode = getLocalClusterNode();
222 
223             clusterNode.setPort(port);
224 
225             ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
226                 ClusterMessageType.UPDATE, clusterNode);
227 
228             _controlChannel.send(null, null, clusterRequest);
229         }
230         catch (Exception e) {
231             if (_log.isErrorEnabled()) {
232                 _log.error("Unable to determine configure node port", e);
233             }
234         }
235     }
236 
237     public void removeClusterEventListener(
238         ClusterEventListener clusterEventListener) {
239 
240         if (!isEnabled()) {
241             return;
242         }
243 
244         _clusterEventListeners.remove(clusterEventListener);
245     }
246 
247     public void setClusterEventListeners(
248         List<ClusterEventListener> clusterEventListeners) {
249 
250         if (!isEnabled()) {
251             return;
252         }
253 
254         _clusterEventListeners.addAllAbsent(clusterEventListeners);
255     }
256 
257     public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
258         if (!isEnabled()) {
259             return;
260         }
261 
262         _shortcutLocalMethod = shortcutLocalMethod;
263     }
264 
265     protected void fireClusterEvent(ClusterEvent clusterEvent) {
266         for (ClusterEventListener listener : _clusterEventListeners) {
267             listener.processClusterEvent(clusterEvent);
268         }
269     }
270 
271     protected JChannel getControlChannel() {
272         return _controlChannel;
273     }
274 
275     protected FutureClusterResponses getExecutionResults(String uuid) {
276         return _executionResultMap.get(uuid);
277     }
278 
279     protected Address getLocalControlAddress() {
280         return new AddressImpl(_controlChannel.getLocalAddress());
281     }
282 
283     protected void initChannels() {
284         Properties controlProperties = PropsUtil.getProperties(
285             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
286 
287         String controlProperty = controlProperties.getProperty(
288             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
289 
290         ClusterRequestReceiver clusterInvokeReceiver =
291             new ClusterRequestReceiver(this);
292 
293         try {
294             _controlChannel = createJChannel(
295                 controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
296         }
297         catch (ChannelException ce) {
298             _log.error(ce, ce);
299         }
300         catch (Exception e) {
301             _log.error(e, e);
302         }
303     }
304 
305     protected boolean isShortcutLocalMethod() {
306         return _shortcutLocalMethod;
307     }
308 
309     protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
310         _addressMap.put(joinAddress, clusterNode);
311 
312         Address previousAddress = _clusterNodeIdMap.put(
313             clusterNode.getClusterNodeId(), joinAddress);
314 
315         if ((previousAddress == null) &&
316             !getLocalControlAddress().equals(joinAddress)) {
317 
318             ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
319 
320             fireClusterEvent(clusterEvent);
321         }
322     }
323 
324     protected void memberRemoved(List<Address> departAddresses) {
325         List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
326 
327         for (Address departAddress : departAddresses) {
328             ClusterNode departingClusterNode = _addressMap.remove(
329                 departAddress);
330             if (departingClusterNode != null) {
331                 departingClusterNodes.add(departingClusterNode);
332 
333                 _clusterNodeIdMap.remove(
334                     departingClusterNode.getClusterNodeId());
335             }
336         }
337 
338         if (departingClusterNodes.isEmpty()) {
339             return;
340         }
341 
342         ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
343 
344         fireClusterEvent(clusterEvent);
345     }
346 
347     protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
348         boolean isMulticast = clusterRequest.isMulticast();
349 
350         List<Address> addresses = null;
351 
352         if (isMulticast) {
353             addresses = getAddresses(_controlChannel);
354         }
355         else {
356             Collection<String> clusterNodeIds =
357                 clusterRequest.getTargetClusterNodeIds();
358 
359             addresses = new ArrayList<Address>(clusterNodeIds.size());
360 
361             for (String clusterNodeId : clusterNodeIds) {
362                 Address address = _clusterNodeIdMap.get(clusterNodeId);
363 
364                 addresses.add(address);
365             }
366         }
367 
368         return addresses;
369     }
370 
371     protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler)
372         throws SystemException {
373 
374         ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
375 
376         ClusterNode localClusterNode = getLocalClusterNode();
377 
378         clusterNodeResponse.setClusterNode(localClusterNode);
379         clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
380 
381         if (methodHandler == null) {
382             clusterNodeResponse.setException(
383                 new ClusterException(
384                     "Payload is not of type " + MethodHandler.class.getName()));
385 
386             return clusterNodeResponse;
387         }
388 
389         try {
390             Object returnValue = methodHandler.invoke(true);
391 
392             if (returnValue instanceof Serializable) {
393                 clusterNodeResponse.setResult(returnValue);
394             }
395             else if (returnValue != null) {
396                 clusterNodeResponse.setException(
397                     new ClusterException("Return value is not serializable"));
398             }
399         }
400         catch (Exception e) {
401             clusterNodeResponse.setException(e);
402         }
403 
404         return clusterNodeResponse;
405     }
406 
407     protected void sendMulticastRequest(ClusterRequest clusterRequest)
408         throws SystemException {
409 
410         try {
411             _controlChannel.send(null, null, clusterRequest);
412         }
413         catch (ChannelException ce) {
414             _log.error(
415                 "Unable to send multicast message " + clusterRequest, ce);
416 
417             throw new SystemException(
418                 "Unable to send multicast request", ce);
419         }
420     }
421 
422     protected void sendUnicastRequest(
423             ClusterRequest clusterRequest, List<Address> addresses)
424         throws SystemException {
425 
426         for (Address address : addresses) {
427             org.jgroups.Address jGroupsAddress =
428                 (org.jgroups.Address)address.getRealAddress();
429 
430             try {
431                 _controlChannel.send(jGroupsAddress, null, clusterRequest);
432             }
433             catch (ChannelException ce) {
434                 _log.error(
435                     "Unable to send unicast message " + clusterRequest, ce);
436 
437                 throw new SystemException(
438                     "Unable to send unicast request", ce);
439             }
440         }
441     }
442 
443     private static final String _DEFAULT_CLUSTER_NAME =
444         "LIFERAY-CONTROL-CHANNEL";
445 
446     private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
447 
448     private Map<Address, ClusterNode> _addressMap =
449         new ConcurrentHashMap<Address, ClusterNode>();
450     private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
451         new CopyOnWriteArrayList<ClusterEventListener>();
452     private Map<String, Address> _clusterNodeIdMap =
453         new ConcurrentHashMap<String, Address>();
454     private JChannel _controlChannel;
455     private Map<String, FutureClusterResponses> _executionResultMap =
456         new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
457     private String _localClusterNodeId;
458     private boolean _shortcutLocalMethod;
459 
460 }