001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
018 import com.liferay.portal.kernel.cluster.Address;
019 import com.liferay.portal.kernel.cluster.ClusterException;
020 import com.liferay.portal.kernel.cluster.ClusterMessageType;
021 import com.liferay.portal.kernel.cluster.ClusterNode;
022 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
023 import com.liferay.portal.kernel.cluster.ClusterRequest;
024 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
025 import com.liferay.portal.kernel.log.Log;
026 import com.liferay.portal.kernel.log.LogFactoryUtil;
027 import com.liferay.portal.kernel.util.MethodHandler;
028
029 import java.io.Serializable;
030
031 import java.util.ArrayList;
032 import java.util.Iterator;
033 import java.util.List;
034
035 import org.jgroups.Channel;
036 import org.jgroups.ChannelException;
037 import org.jgroups.Message;
038 import org.jgroups.View;
039
040
044 public class ClusterRequestReceiver extends BaseReceiver {
045
046 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
047 _clusterExecutorImpl = clusterExecutorImpl;
048 }
049
050 public void receive(Message message) {
051 org.jgroups.Address sourceAddress = message.getSrc();
052
053 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
054
055 org.jgroups.Address localAddress = controlChannel.getAddress();
056
057 Object obj = message.getObject();
058
059 if (obj == null) {
060 if (_log.isWarnEnabled()) {
061 _log.warn("Message content is null");
062 }
063
064 return;
065 }
066
067 if (localAddress.equals(sourceAddress)) {
068 boolean isProcessed = processLocalMessage(obj, sourceAddress);
069
070 if (isProcessed) {
071 return;
072 }
073 }
074
075 if (obj instanceof ClusterRequest) {
076 ClusterRequest clusterRequest = (ClusterRequest)obj;
077
078 processClusterRequest(clusterRequest, sourceAddress, localAddress);
079 }
080 else if (obj instanceof ClusterNodeResponse) {
081 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
082
083 processClusterResponse(
084 clusterNodeResponse, sourceAddress, localAddress);
085 }
086 else {
087 if (_log.isWarnEnabled()) {
088 _log.warn(
089 "Unable to process message content of type " +
090 obj.getClass().getName());
091 }
092 }
093 }
094
095 public void viewAccepted(View view) {
096 if (_log.isDebugEnabled()) {
097 _log.debug("Accepted view " + view);
098 }
099
100 if (_lastView == null) {
101 _lastView = view;
102
103 return;
104 }
105
106 List<Address> departAddresses = getDepartAddresses(view);
107
108 _lastView = view;
109
110 if (departAddresses.isEmpty()) {
111 return;
112 }
113
114 _clusterExecutorImpl.memberRemoved(departAddresses);
115 }
116
117 protected Object invoke(
118 String servletContextName, MethodHandler methodHandler)
119 throws Exception {
120
121 if (servletContextName == null) {
122 return methodHandler.invoke(true);
123 }
124
125 Thread currentThread = Thread.currentThread();
126
127 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
128
129 try {
130 ClassLoader classLoader =
131 (ClassLoader)PortletBeanLocatorUtil.locate(
132 servletContextName, "portletClassLoader");
133
134 currentThread.setContextClassLoader(classLoader);
135
136 return methodHandler.invoke(true);
137 }
138 catch(Exception e) {
139 throw e;
140 }
141 finally {
142 currentThread.setContextClassLoader(contextClassLoader);
143 }
144 }
145
146 protected List<Address> getDepartAddresses(View view) {
147 List<Address> departAddresses = new ArrayList<Address>();
148
149 List<org.jgroups.Address> jGroupsAddresses = view.getMembers();
150 List<org.jgroups.Address> lastJGroupsAddresses =
151 _lastView.getMembers();
152
153 List<org.jgroups.Address> tempAddresses =
154 new ArrayList<org.jgroups.Address>(jGroupsAddresses.size());
155
156 tempAddresses.addAll(jGroupsAddresses);
157
158 List<org.jgroups.Address> lastAddresses =
159 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses.size());
160
161 lastAddresses.addAll(lastJGroupsAddresses);
162
163 tempAddresses.retainAll(lastJGroupsAddresses);
164 lastAddresses.removeAll(tempAddresses);
165
166 if (!lastAddresses.isEmpty()) {
167 Iterator<org.jgroups.Address> itr = lastAddresses.iterator();
168
169 while (itr.hasNext()) {
170 departAddresses.add(new AddressImpl(itr.next()));
171 }
172 }
173
174 return departAddresses;
175 }
176
177 protected void processClusterRequest(
178 ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
179 org.jgroups.Address localAddress) {
180
181 ClusterMessageType clusterMessageType =
182 clusterRequest.getClusterMessageType();
183
184 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
185
186 try {
187 ClusterNode localClusterNode =
188 _clusterExecutorImpl.getLocalClusterNode();
189
190 clusterNodeResponse.setClusterNode(localClusterNode);
191 }
192 catch (Exception e) {
193 clusterNodeResponse.setException(e);
194 }
195
196 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
197 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
198
199 ClusterNode originatingClusterNode =
200 clusterRequest.getOriginatingClusterNode();
201
202 if (originatingClusterNode != null) {
203 _clusterExecutorImpl.memberJoined(
204 new AddressImpl(sourceAddress), originatingClusterNode);
205
206 clusterNodeResponse.setClusterMessageType(clusterMessageType);
207 }
208 else {
209 if (_log.isWarnEnabled()) {
210 _log.warn(
211 "Content of notify message does not contain cluster " +
212 "node information");
213 }
214
215 return;
216 }
217 }
218 else {
219 clusterNodeResponse.setClusterMessageType(
220 ClusterMessageType.EXECUTE);
221 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
222 clusterNodeResponse.setUuid(clusterRequest.getUuid());
223
224 MethodHandler methodHandler = clusterRequest.getMethodHandler();
225
226 if (methodHandler != null) {
227 try {
228 ClusterInvokeThreadLocal.setEnabled(false);
229
230 Object returnValue = invoke(
231 clusterRequest.getServletContextName(), methodHandler);
232
233 if (returnValue instanceof Serializable) {
234 clusterNodeResponse.setResult(returnValue);
235 }
236 else if (returnValue != null) {
237 clusterNodeResponse.setException(
238 new ClusterException(
239 "Return value is not serializable"));
240 }
241 }
242 catch (Exception e) {
243 clusterNodeResponse.setException(e);
244
245 _log.error("Failed to invoke method " + methodHandler, e);
246 }
247 finally {
248 ClusterInvokeThreadLocal.setEnabled(true);
249 }
250 }
251 else {
252 clusterNodeResponse.setException(
253 new ClusterException(
254 "Payload is not of type " +
255 MethodHandler.class.getName()));
256 }
257 }
258
259 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
260
261 try {
262 controlChannel.send(
263 sourceAddress, localAddress, clusterNodeResponse);
264 }
265 catch (ChannelException ce) {
266 _log.error(
267 "Unable to send response message " + clusterNodeResponse, ce);
268 }
269 catch (Throwable t) {
270 _log.error(t, t);
271 }
272 }
273
274 protected void processClusterResponse(
275 ClusterNodeResponse clusterNodeResponse,
276 org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
277
278 ClusterMessageType clusterMessageType =
279 clusterNodeResponse.getClusterMessageType();
280
281 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
282 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
283
284 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
285
286 if (clusterNode != null) {
287 Address joinAddress = new AddressImpl(sourceAddress);
288
289 _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
290 }
291 else {
292 if (_log.isWarnEnabled()) {
293 _log.warn(
294 "Response of notify message does not contain cluster " +
295 "node information");
296 }
297 }
298
299 return;
300 }
301
302 String uuid = clusterNodeResponse.getUuid();
303
304 FutureClusterResponses futureClusterResponses =
305 _clusterExecutorImpl.getExecutionResults(uuid);
306
307 if (futureClusterResponses == null) {
308 if (_log.isInfoEnabled()) {
309 _log.info("Unable to find response container for " + uuid);
310 }
311
312 return;
313 }
314
315 Address address = new AddressImpl(sourceAddress);
316
317 if (futureClusterResponses.expectsReply(address)) {
318 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
319 }
320 else {
321 if (_log.isWarnEnabled()) {
322 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
323 }
324 }
325 }
326
327 protected boolean processLocalMessage(
328 Object message, org.jgroups.Address sourceAddress) {
329
330 if (message instanceof ClusterRequest) {
331 ClusterRequest clusterRequest = (ClusterRequest)message;
332
333 if (clusterRequest.isSkipLocal()) {
334 return true;
335 }
336
337 ClusterMessageType clusterMessageType =
338 clusterRequest.getClusterMessageType();
339
340 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
341 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
342
343 ClusterNode originatingClusterNode =
344 clusterRequest.getOriginatingClusterNode();
345
346 if (originatingClusterNode != null) {
347 Address joinAddress = new AddressImpl(sourceAddress);
348
349 _clusterExecutorImpl.memberJoined(
350 joinAddress, originatingClusterNode);
351 }
352 else {
353 if (_log.isWarnEnabled()) {
354 _log.warn(
355 "Content of notify message does not contain " +
356 "cluster node information");
357 }
358 }
359
360 return true;
361 }
362 }
363
364 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
365 return true;
366 }
367
368 return false;
369 }
370
371 private static Log _log = LogFactoryUtil.getLog(
372 ClusterRequestReceiver.class);
373
374 private ClusterExecutorImpl _clusterExecutorImpl;
375 private View _lastView;
376
377 }