1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.cluster;
16  
17  import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
18  import com.liferay.portal.kernel.cluster.Address;
19  import com.liferay.portal.kernel.cluster.ClusterException;
20  import com.liferay.portal.kernel.cluster.ClusterMessageType;
21  import com.liferay.portal.kernel.cluster.ClusterNode;
22  import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
23  import com.liferay.portal.kernel.cluster.ClusterRequest;
24  import com.liferay.portal.kernel.cluster.FutureClusterResponses;
25  import com.liferay.portal.kernel.log.Log;
26  import com.liferay.portal.kernel.log.LogFactoryUtil;
27  import com.liferay.portal.kernel.util.MethodHandler;
28  
29  import java.io.Serializable;
30  
31  import java.util.ArrayList;
32  import java.util.Iterator;
33  import java.util.List;
34  
35  import org.jgroups.Channel;
36  import org.jgroups.ChannelException;
37  import org.jgroups.Message;
38  import org.jgroups.View;
39  
40  /**
41   * <a href="ClusterRequestReceiver.java.html"><b><i>View Source</i></b></a>
42   *
43   * @author Michael C. Han
44   * @author Tina Tian
45   */
46  public class ClusterRequestReceiver extends BaseReceiver {
47  
48      public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
49          _clusterExecutorImpl = clusterExecutorImpl;
50      }
51  
52      public void receive(Message message) {
53          org.jgroups.Address sourceAddress = message.getSrc();
54  
55          Channel controlChannel = _clusterExecutorImpl.getControlChannel();
56  
57          org.jgroups.Address localAddress = controlChannel.getAddress();
58  
59          Object obj = message.getObject();
60  
61          if (obj == null) {
62              if (_log.isWarnEnabled()) {
63                  _log.warn("Message content is null");
64              }
65  
66              return;
67          }
68  
69          if (localAddress.equals(sourceAddress)) {
70              boolean isProcessed = processLocalMessage(obj, sourceAddress);
71  
72              if (isProcessed) {
73                  return;
74              }
75          }
76  
77          if (obj instanceof ClusterRequest) {
78              ClusterRequest clusterRequest = (ClusterRequest)obj;
79  
80              processClusterRequest(clusterRequest, sourceAddress, localAddress);
81          }
82          else if (obj instanceof ClusterNodeResponse) {
83              ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
84  
85              processClusterResponse(
86                  clusterNodeResponse, sourceAddress, localAddress);
87          }
88          else {
89              if (_log.isWarnEnabled()) {
90                  _log.warn(
91                      "Unable to process message content of type " +
92                          obj.getClass().getName());
93              }
94          }
95      }
96  
97      public void viewAccepted(View view) {
98          if (_log.isDebugEnabled()) {
99              _log.debug("Accepted view " + view);
100         }
101 
102         if (_lastView == null) {
103             _lastView = view;
104 
105             return;
106         }
107 
108         List<Address> departAddresses = getDepartAddresses(view);
109 
110         _lastView = view;
111 
112         if (departAddresses.isEmpty()) {
113             return;
114         }
115 
116         _clusterExecutorImpl.memberRemoved(departAddresses);
117     }
118 
119     protected Object invoke(
120             String servletContextName, MethodHandler methodHandler)
121         throws Exception {
122 
123         if (servletContextName == null) {
124             return methodHandler.invoke(true);
125         }
126 
127         Thread currentThread = Thread.currentThread();
128 
129         ClassLoader contextClassLoader = currentThread.getContextClassLoader();
130 
131         try {
132             ClassLoader classLoader =
133                 (ClassLoader)PortletBeanLocatorUtil.locate(
134                     servletContextName, "portletClassLoader");
135 
136             currentThread.setContextClassLoader(classLoader);
137 
138             return methodHandler.invoke(true);
139         }
140         catch(Exception e) {
141             throw e;
142         }
143         finally {
144             currentThread.setContextClassLoader(contextClassLoader);
145         }
146     }
147 
148     protected List<Address> getDepartAddresses(View view) {
149         List<Address> departAddresses = new ArrayList<Address>();
150 
151         List<org.jgroups.Address> jGroupsAddresses = view.getMembers();
152         List<org.jgroups.Address> lastJGroupsAddresses =
153             _lastView.getMembers();
154 
155         List<org.jgroups.Address> tempAddresses =
156             new ArrayList<org.jgroups.Address>(jGroupsAddresses.size());
157 
158         tempAddresses.addAll(jGroupsAddresses);
159 
160         List<org.jgroups.Address> lastAddresses =
161             new ArrayList<org.jgroups.Address>(lastJGroupsAddresses.size());
162 
163         lastAddresses.addAll(lastJGroupsAddresses);
164 
165         tempAddresses.retainAll(lastJGroupsAddresses);
166         lastAddresses.removeAll(tempAddresses);
167 
168         if (!lastAddresses.isEmpty()) {
169             Iterator<org.jgroups.Address> itr = lastAddresses.iterator();
170 
171             while (itr.hasNext()) {
172                 departAddresses.add(new AddressImpl(itr.next()));
173             }
174         }
175 
176         return departAddresses;
177     }
178 
179     protected void processClusterRequest(
180         ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
181         org.jgroups.Address localAddress) {
182 
183         ClusterMessageType clusterMessageType =
184             clusterRequest.getClusterMessageType();
185 
186         ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
187 
188         try {
189             ClusterNode localClusterNode =
190                 _clusterExecutorImpl.getLocalClusterNode();
191 
192             clusterNodeResponse.setClusterNode(localClusterNode);
193         }
194         catch (Exception e) {
195             clusterNodeResponse.setException(e);
196         }
197 
198         if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
199             clusterMessageType.equals(ClusterMessageType.UPDATE)) {
200 
201             ClusterNode originatingClusterNode =
202                 clusterRequest.getOriginatingClusterNode();
203 
204             if (originatingClusterNode != null) {
205                 _clusterExecutorImpl.memberJoined(
206                     new AddressImpl(sourceAddress), originatingClusterNode);
207 
208                 clusterNodeResponse.setClusterMessageType(clusterMessageType);
209             }
210             else {
211                 if (_log.isWarnEnabled()) {
212                     _log.warn(
213                         "Content of notify message does not contain cluster " +
214                             "node information");
215                 }
216 
217                 return;
218             }
219         }
220         else {
221             clusterNodeResponse.setClusterMessageType(
222                 ClusterMessageType.EXECUTE);
223             clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
224             clusterNodeResponse.setUuid(clusterRequest.getUuid());
225 
226             MethodHandler methodHandler = clusterRequest.getMethodHandler();
227 
228             if (methodHandler != null) {
229                 try {
230                     ClusterInvokeThreadLocal.setEnabled(false);
231 
232                     Object returnValue = invoke(
233                         clusterRequest.getServletContextName(), methodHandler);
234 
235                     if (returnValue instanceof Serializable) {
236                         clusterNodeResponse.setResult(returnValue);
237                     }
238                     else if (returnValue != null) {
239                         clusterNodeResponse.setException(
240                             new ClusterException(
241                                 "Return value is not serializable"));
242                     }
243                 }
244                 catch (Exception e) {
245                     clusterNodeResponse.setException(e);
246 
247                     _log.error("Failed to invoke method " + methodHandler, e);
248                 }
249                 finally {
250                     ClusterInvokeThreadLocal.setEnabled(true);
251                 }
252             }
253             else {
254                 clusterNodeResponse.setException(
255                     new ClusterException(
256                         "Payload is not of type " +
257                             MethodHandler.class.getName()));
258             }
259         }
260 
261         Channel controlChannel = _clusterExecutorImpl.getControlChannel();
262 
263         try {
264             controlChannel.send(
265                 sourceAddress, localAddress, clusterNodeResponse);
266         }
267         catch (ChannelException ce) {
268             _log.error(
269                 "Unable to send response message " + clusterNodeResponse, ce);
270         }
271         catch (Throwable t) {
272             _log.error(t, t);
273         }
274     }
275 
276     protected void processClusterResponse(
277         ClusterNodeResponse clusterNodeResponse,
278         org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
279 
280         ClusterMessageType clusterMessageType =
281             clusterNodeResponse.getClusterMessageType();
282 
283         if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
284             clusterMessageType.equals(ClusterMessageType.UPDATE)) {
285 
286             ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
287 
288             if (clusterNode != null) {
289                 Address joinAddress = new AddressImpl(sourceAddress);
290 
291                 _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
292             }
293             else {
294                 if (_log.isWarnEnabled()) {
295                     _log.warn(
296                         "Response of notify message does not contain cluster " +
297                             "node information");
298                 }
299             }
300 
301             return;
302         }
303 
304         String uuid = clusterNodeResponse.getUuid();
305 
306         FutureClusterResponses futureClusterResponses =
307             _clusterExecutorImpl.getExecutionResults(uuid);
308 
309         if (futureClusterResponses == null) {
310             if (_log.isInfoEnabled()) {
311                 _log.info("Unable to find response container for " + uuid);
312             }
313 
314             return;
315         }
316 
317         Address address = new AddressImpl(sourceAddress);
318 
319         if (futureClusterResponses.expectsReply(address)) {
320             futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
321         }
322         else {
323             if (_log.isWarnEnabled()) {
324                 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
325             }
326         }
327     }
328 
329     protected boolean processLocalMessage(
330         Object message, org.jgroups.Address sourceAddress) {
331 
332         if (message instanceof ClusterRequest) {
333             ClusterRequest clusterRequest = (ClusterRequest)message;
334 
335             if (clusterRequest.isSkipLocal()) {
336                 return true;
337             }
338 
339             ClusterMessageType clusterMessageType =
340                 clusterRequest.getClusterMessageType();
341 
342             if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
343                 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
344 
345                 ClusterNode originatingClusterNode =
346                     clusterRequest.getOriginatingClusterNode();
347 
348                 if (originatingClusterNode != null) {
349                     Address joinAddress = new AddressImpl(sourceAddress);
350 
351                     _clusterExecutorImpl.memberJoined(
352                         joinAddress, originatingClusterNode);
353                 }
354                 else {
355                     if (_log.isWarnEnabled()) {
356                         _log.warn(
357                             "Content of notify message does not contain " +
358                                 "cluster node information");
359                     }
360                 }
361 
362                 return true;
363             }
364         }
365 
366         if (_clusterExecutorImpl.isShortcutLocalMethod()) {
367             return true;
368         }
369 
370         return false;
371     }
372 
373     private static Log _log = LogFactoryUtil.getLog(
374         ClusterRequestReceiver.class);
375 
376     private ClusterExecutorImpl _clusterExecutorImpl;
377     private View _lastView;
378 
379 }