001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
018    import com.liferay.portal.kernel.cluster.Address;
019    import com.liferay.portal.kernel.cluster.ClusterException;
020    import com.liferay.portal.kernel.cluster.ClusterMessageType;
021    import com.liferay.portal.kernel.cluster.ClusterNode;
022    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
023    import com.liferay.portal.kernel.cluster.ClusterRequest;
024    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
025    import com.liferay.portal.kernel.log.Log;
026    import com.liferay.portal.kernel.log.LogFactoryUtil;
027    import com.liferay.portal.kernel.util.MethodHandler;
028    
029    import java.io.Serializable;
030    
031    import java.util.ArrayList;
032    import java.util.Iterator;
033    import java.util.List;
034    
035    import org.jgroups.Channel;
036    import org.jgroups.ChannelException;
037    import org.jgroups.Message;
038    import org.jgroups.View;
039    
040    /**
041     * @author Michael C. Han
042     * @author Tina Tian
043     */
044    public class ClusterRequestReceiver extends BaseReceiver {
045    
046            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
047                    _clusterExecutorImpl = clusterExecutorImpl;
048            }
049    
050            public void receive(Message message) {
051                    org.jgroups.Address sourceAddress = message.getSrc();
052    
053                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
054    
055                    org.jgroups.Address localAddress = controlChannel.getAddress();
056    
057                    Object obj = message.getObject();
058    
059                    if (obj == null) {
060                            if (_log.isWarnEnabled()) {
061                                    _log.warn("Message content is null");
062                            }
063    
064                            return;
065                    }
066    
067                    if (localAddress.equals(sourceAddress)) {
068                            boolean isProcessed = processLocalMessage(obj, sourceAddress);
069    
070                            if (isProcessed) {
071                                    return;
072                            }
073                    }
074    
075                    if (obj instanceof ClusterRequest) {
076                            ClusterRequest clusterRequest = (ClusterRequest)obj;
077    
078                            processClusterRequest(clusterRequest, sourceAddress, localAddress);
079                    }
080                    else if (obj instanceof ClusterNodeResponse) {
081                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
082    
083                            processClusterResponse(
084                                    clusterNodeResponse, sourceAddress, localAddress);
085                    }
086                    else {
087                            if (_log.isWarnEnabled()) {
088                                    _log.warn(
089                                            "Unable to process message content of type " +
090                                                    obj.getClass().getName());
091                            }
092                    }
093            }
094    
095            public void viewAccepted(View view) {
096                    if (_log.isDebugEnabled()) {
097                            _log.debug("Accepted view " + view);
098                    }
099    
100                    if (_lastView == null) {
101                            _lastView = view;
102    
103                            return;
104                    }
105    
106                    List<Address> departAddresses = getDepartAddresses(view);
107    
108                    _lastView = view;
109    
110                    if (departAddresses.isEmpty()) {
111                            return;
112                    }
113    
114                    _clusterExecutorImpl.memberRemoved(departAddresses);
115            }
116    
117            protected Object invoke(
118                            String servletContextName, MethodHandler methodHandler)
119                    throws Exception {
120    
121                    if (servletContextName == null) {
122                            return methodHandler.invoke(true);
123                    }
124    
125                    Thread currentThread = Thread.currentThread();
126    
127                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
128    
129                    try {
130                            ClassLoader classLoader =
131                                    (ClassLoader)PortletBeanLocatorUtil.locate(
132                                            servletContextName, "portletClassLoader");
133    
134                            currentThread.setContextClassLoader(classLoader);
135    
136                            return methodHandler.invoke(true);
137                    }
138                    catch(Exception e) {
139                            throw e;
140                    }
141                    finally {
142                            currentThread.setContextClassLoader(contextClassLoader);
143                    }
144            }
145    
146            protected List<Address> getDepartAddresses(View view) {
147                    List<Address> departAddresses = new ArrayList<Address>();
148    
149                    List<org.jgroups.Address> jGroupsAddresses = view.getMembers();
150                    List<org.jgroups.Address> lastJGroupsAddresses =
151                            _lastView.getMembers();
152    
153                    List<org.jgroups.Address> tempAddresses =
154                            new ArrayList<org.jgroups.Address>(jGroupsAddresses.size());
155    
156                    tempAddresses.addAll(jGroupsAddresses);
157    
158                    List<org.jgroups.Address> lastAddresses =
159                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses.size());
160    
161                    lastAddresses.addAll(lastJGroupsAddresses);
162    
163                    tempAddresses.retainAll(lastJGroupsAddresses);
164                    lastAddresses.removeAll(tempAddresses);
165    
166                    if (!lastAddresses.isEmpty()) {
167                            Iterator<org.jgroups.Address> itr = lastAddresses.iterator();
168    
169                            while (itr.hasNext()) {
170                                    departAddresses.add(new AddressImpl(itr.next()));
171                            }
172                    }
173    
174                    return departAddresses;
175            }
176    
177            protected void processClusterRequest(
178                    ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
179                    org.jgroups.Address localAddress) {
180    
181                    ClusterMessageType clusterMessageType =
182                            clusterRequest.getClusterMessageType();
183    
184                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
185    
186                    try {
187                            ClusterNode localClusterNode =
188                                    _clusterExecutorImpl.getLocalClusterNode();
189    
190                            clusterNodeResponse.setClusterNode(localClusterNode);
191                    }
192                    catch (Exception e) {
193                            clusterNodeResponse.setException(e);
194                    }
195    
196                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
197                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
198    
199                            ClusterNode originatingClusterNode =
200                                    clusterRequest.getOriginatingClusterNode();
201    
202                            if (originatingClusterNode != null) {
203                                    _clusterExecutorImpl.memberJoined(
204                                            new AddressImpl(sourceAddress), originatingClusterNode);
205    
206                                    clusterNodeResponse.setClusterMessageType(clusterMessageType);
207                            }
208                            else {
209                                    if (_log.isWarnEnabled()) {
210                                            _log.warn(
211                                                    "Content of notify message does not contain cluster " +
212                                                            "node information");
213                                    }
214    
215                                    return;
216                            }
217                    }
218                    else {
219                            clusterNodeResponse.setClusterMessageType(
220                                    ClusterMessageType.EXECUTE);
221                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
222                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
223    
224                            MethodHandler methodHandler = clusterRequest.getMethodHandler();
225    
226                            if (methodHandler != null) {
227                                    try {
228                                            ClusterInvokeThreadLocal.setEnabled(false);
229    
230                                            Object returnValue = invoke(
231                                                    clusterRequest.getServletContextName(), methodHandler);
232    
233                                            if (returnValue instanceof Serializable) {
234                                                    clusterNodeResponse.setResult(returnValue);
235                                            }
236                                            else if (returnValue != null) {
237                                                    clusterNodeResponse.setException(
238                                                            new ClusterException(
239                                                                    "Return value is not serializable"));
240                                            }
241                                    }
242                                    catch (Exception e) {
243                                            clusterNodeResponse.setException(e);
244    
245                                            _log.error("Failed to invoke method " + methodHandler, e);
246                                    }
247                                    finally {
248                                            ClusterInvokeThreadLocal.setEnabled(true);
249                                    }
250                            }
251                            else {
252                                    clusterNodeResponse.setException(
253                                            new ClusterException(
254                                                    "Payload is not of type " +
255                                                            MethodHandler.class.getName()));
256                            }
257                    }
258    
259                    Channel controlChannel = _clusterExecutorImpl.getControlChannel();
260    
261                    try {
262                            controlChannel.send(
263                                    sourceAddress, localAddress, clusterNodeResponse);
264                    }
265                    catch (ChannelException ce) {
266                            _log.error(
267                                    "Unable to send response message " + clusterNodeResponse, ce);
268                    }
269                    catch (Throwable t) {
270                            _log.error(t, t);
271                    }
272            }
273    
274            protected void processClusterResponse(
275                    ClusterNodeResponse clusterNodeResponse,
276                    org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
277    
278                    ClusterMessageType clusterMessageType =
279                            clusterNodeResponse.getClusterMessageType();
280    
281                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
282                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
283    
284                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
285    
286                            if (clusterNode != null) {
287                                    Address joinAddress = new AddressImpl(sourceAddress);
288    
289                                    _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
290                            }
291                            else {
292                                    if (_log.isWarnEnabled()) {
293                                            _log.warn(
294                                                    "Response of notify message does not contain cluster " +
295                                                            "node information");
296                                    }
297                            }
298    
299                            return;
300                    }
301    
302                    String uuid = clusterNodeResponse.getUuid();
303    
304                    FutureClusterResponses futureClusterResponses =
305                            _clusterExecutorImpl.getExecutionResults(uuid);
306    
307                    if (futureClusterResponses == null) {
308                            if (_log.isInfoEnabled()) {
309                                    _log.info("Unable to find response container for " + uuid);
310                            }
311    
312                            return;
313                    }
314    
315                    Address address = new AddressImpl(sourceAddress);
316    
317                    if (futureClusterResponses.expectsReply(address)) {
318                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
319                    }
320                    else {
321                            if (_log.isWarnEnabled()) {
322                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
323                            }
324                    }
325            }
326    
327            protected boolean processLocalMessage(
328                    Object message, org.jgroups.Address sourceAddress) {
329    
330                    if (message instanceof ClusterRequest) {
331                            ClusterRequest clusterRequest = (ClusterRequest)message;
332    
333                            if (clusterRequest.isSkipLocal()) {
334                                    return true;
335                            }
336    
337                            ClusterMessageType clusterMessageType =
338                                    clusterRequest.getClusterMessageType();
339    
340                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
341                                    clusterMessageType.equals(ClusterMessageType.UPDATE)) {
342    
343                                    ClusterNode originatingClusterNode =
344                                            clusterRequest.getOriginatingClusterNode();
345    
346                                    if (originatingClusterNode != null) {
347                                            Address joinAddress = new AddressImpl(sourceAddress);
348    
349                                            _clusterExecutorImpl.memberJoined(
350                                                    joinAddress, originatingClusterNode);
351                                    }
352                                    else {
353                                            if (_log.isWarnEnabled()) {
354                                                    _log.warn(
355                                                            "Content of notify message does not contain " +
356                                                                    "cluster node information");
357                                            }
358                                    }
359    
360                                    return true;
361                            }
362                    }
363    
364                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
365                            return true;
366                    }
367    
368                    return false;
369            }
370    
371            private static Log _log = LogFactoryUtil.getLog(
372                    ClusterRequestReceiver.class);
373    
374            private ClusterExecutorImpl _clusterExecutorImpl;
375            private View _lastView;
376    
377    }