1
14
15 package com.liferay.portal.cluster;
16
17 import com.liferay.portal.kernel.bean.PortletBeanLocatorUtil;
18 import com.liferay.portal.kernel.cluster.Address;
19 import com.liferay.portal.kernel.cluster.ClusterException;
20 import com.liferay.portal.kernel.cluster.ClusterMessageType;
21 import com.liferay.portal.kernel.cluster.ClusterNode;
22 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
23 import com.liferay.portal.kernel.cluster.ClusterRequest;
24 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.MethodHandler;
28
29 import java.io.Serializable;
30
31 import java.util.ArrayList;
32 import java.util.Iterator;
33 import java.util.List;
34
35 import org.jgroups.Channel;
36 import org.jgroups.ChannelException;
37 import org.jgroups.Message;
38 import org.jgroups.View;
39
40
46 public class ClusterRequestReceiver extends BaseReceiver {
47
48 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
49 _clusterExecutorImpl = clusterExecutorImpl;
50 }
51
52 public void receive(Message message) {
53 org.jgroups.Address sourceAddress = message.getSrc();
54
55 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
56
57 org.jgroups.Address localAddress = controlChannel.getAddress();
58
59 Object obj = message.getObject();
60
61 if (obj == null) {
62 if (_log.isWarnEnabled()) {
63 _log.warn("Message content is null");
64 }
65
66 return;
67 }
68
69 if (localAddress.equals(sourceAddress)) {
70 boolean isProcessed = processLocalMessage(obj, sourceAddress);
71
72 if (isProcessed) {
73 return;
74 }
75 }
76
77 if (obj instanceof ClusterRequest) {
78 ClusterRequest clusterRequest = (ClusterRequest)obj;
79
80 processClusterRequest(clusterRequest, sourceAddress, localAddress);
81 }
82 else if (obj instanceof ClusterNodeResponse) {
83 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
84
85 processClusterResponse(
86 clusterNodeResponse, sourceAddress, localAddress);
87 }
88 else {
89 if (_log.isWarnEnabled()) {
90 _log.warn(
91 "Unable to process message content of type " +
92 obj.getClass().getName());
93 }
94 }
95 }
96
97 public void viewAccepted(View view) {
98 if (_log.isDebugEnabled()) {
99 _log.debug("Accepted view " + view);
100 }
101
102 if (_lastView == null) {
103 _lastView = view;
104
105 return;
106 }
107
108 List<Address> departAddresses = getDepartAddresses(view);
109
110 _lastView = view;
111
112 if (departAddresses.isEmpty()) {
113 return;
114 }
115
116 _clusterExecutorImpl.memberRemoved(departAddresses);
117 }
118
119 protected Object invoke(
120 String servletContextName, MethodHandler methodHandler)
121 throws Exception {
122
123 if (servletContextName == null) {
124 return methodHandler.invoke(true);
125 }
126
127 Thread currentThread = Thread.currentThread();
128
129 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
130
131 try {
132 ClassLoader classLoader =
133 (ClassLoader)PortletBeanLocatorUtil.locate(
134 servletContextName, "portletClassLoader");
135
136 currentThread.setContextClassLoader(classLoader);
137
138 return methodHandler.invoke(true);
139 }
140 catch(Exception e) {
141 throw e;
142 }
143 finally {
144 currentThread.setContextClassLoader(contextClassLoader);
145 }
146 }
147
148 protected List<Address> getDepartAddresses(View view) {
149 List<Address> departAddresses = new ArrayList<Address>();
150
151 List<org.jgroups.Address> jGroupsAddresses = view.getMembers();
152 List<org.jgroups.Address> lastJGroupsAddresses =
153 _lastView.getMembers();
154
155 List<org.jgroups.Address> tempAddresses =
156 new ArrayList<org.jgroups.Address>(jGroupsAddresses.size());
157
158 tempAddresses.addAll(jGroupsAddresses);
159
160 List<org.jgroups.Address> lastAddresses =
161 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses.size());
162
163 lastAddresses.addAll(lastJGroupsAddresses);
164
165 tempAddresses.retainAll(lastJGroupsAddresses);
166 lastAddresses.removeAll(tempAddresses);
167
168 if (!lastAddresses.isEmpty()) {
169 Iterator<org.jgroups.Address> itr = lastAddresses.iterator();
170
171 while (itr.hasNext()) {
172 departAddresses.add(new AddressImpl(itr.next()));
173 }
174 }
175
176 return departAddresses;
177 }
178
179 protected void processClusterRequest(
180 ClusterRequest clusterRequest, org.jgroups.Address sourceAddress,
181 org.jgroups.Address localAddress) {
182
183 ClusterMessageType clusterMessageType =
184 clusterRequest.getClusterMessageType();
185
186 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
187
188 try {
189 ClusterNode localClusterNode =
190 _clusterExecutorImpl.getLocalClusterNode();
191
192 clusterNodeResponse.setClusterNode(localClusterNode);
193 }
194 catch (Exception e) {
195 clusterNodeResponse.setException(e);
196 }
197
198 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
199 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
200
201 ClusterNode originatingClusterNode =
202 clusterRequest.getOriginatingClusterNode();
203
204 if (originatingClusterNode != null) {
205 _clusterExecutorImpl.memberJoined(
206 new AddressImpl(sourceAddress), originatingClusterNode);
207
208 clusterNodeResponse.setClusterMessageType(clusterMessageType);
209 }
210 else {
211 if (_log.isWarnEnabled()) {
212 _log.warn(
213 "Content of notify message does not contain cluster " +
214 "node information");
215 }
216
217 return;
218 }
219 }
220 else {
221 clusterNodeResponse.setClusterMessageType(
222 ClusterMessageType.EXECUTE);
223 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
224 clusterNodeResponse.setUuid(clusterRequest.getUuid());
225
226 MethodHandler methodHandler = clusterRequest.getMethodHandler();
227
228 if (methodHandler != null) {
229 try {
230 ClusterInvokeThreadLocal.setEnabled(false);
231
232 Object returnValue = invoke(
233 clusterRequest.getServletContextName(), methodHandler);
234
235 if (returnValue instanceof Serializable) {
236 clusterNodeResponse.setResult(returnValue);
237 }
238 else if (returnValue != null) {
239 clusterNodeResponse.setException(
240 new ClusterException(
241 "Return value is not serializable"));
242 }
243 }
244 catch (Exception e) {
245 clusterNodeResponse.setException(e);
246
247 _log.error("Failed to invoke method " + methodHandler, e);
248 }
249 finally {
250 ClusterInvokeThreadLocal.setEnabled(true);
251 }
252 }
253 else {
254 clusterNodeResponse.setException(
255 new ClusterException(
256 "Payload is not of type " +
257 MethodHandler.class.getName()));
258 }
259 }
260
261 Channel controlChannel = _clusterExecutorImpl.getControlChannel();
262
263 try {
264 controlChannel.send(
265 sourceAddress, localAddress, clusterNodeResponse);
266 }
267 catch (ChannelException ce) {
268 _log.error(
269 "Unable to send response message " + clusterNodeResponse, ce);
270 }
271 catch (Throwable t) {
272 _log.error(t, t);
273 }
274 }
275
276 protected void processClusterResponse(
277 ClusterNodeResponse clusterNodeResponse,
278 org.jgroups.Address sourceAddress, org.jgroups.Address localAddress) {
279
280 ClusterMessageType clusterMessageType =
281 clusterNodeResponse.getClusterMessageType();
282
283 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
284 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
285
286 ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
287
288 if (clusterNode != null) {
289 Address joinAddress = new AddressImpl(sourceAddress);
290
291 _clusterExecutorImpl.memberJoined(joinAddress, clusterNode);
292 }
293 else {
294 if (_log.isWarnEnabled()) {
295 _log.warn(
296 "Response of notify message does not contain cluster " +
297 "node information");
298 }
299 }
300
301 return;
302 }
303
304 String uuid = clusterNodeResponse.getUuid();
305
306 FutureClusterResponses futureClusterResponses =
307 _clusterExecutorImpl.getExecutionResults(uuid);
308
309 if (futureClusterResponses == null) {
310 if (_log.isInfoEnabled()) {
311 _log.info("Unable to find response container for " + uuid);
312 }
313
314 return;
315 }
316
317 Address address = new AddressImpl(sourceAddress);
318
319 if (futureClusterResponses.expectsReply(address)) {
320 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
321 }
322 else {
323 if (_log.isWarnEnabled()) {
324 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
325 }
326 }
327 }
328
329 protected boolean processLocalMessage(
330 Object message, org.jgroups.Address sourceAddress) {
331
332 if (message instanceof ClusterRequest) {
333 ClusterRequest clusterRequest = (ClusterRequest)message;
334
335 if (clusterRequest.isSkipLocal()) {
336 return true;
337 }
338
339 ClusterMessageType clusterMessageType =
340 clusterRequest.getClusterMessageType();
341
342 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
343 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
344
345 ClusterNode originatingClusterNode =
346 clusterRequest.getOriginatingClusterNode();
347
348 if (originatingClusterNode != null) {
349 Address joinAddress = new AddressImpl(sourceAddress);
350
351 _clusterExecutorImpl.memberJoined(
352 joinAddress, originatingClusterNode);
353 }
354 else {
355 if (_log.isWarnEnabled()) {
356 _log.warn(
357 "Content of notify message does not contain " +
358 "cluster node information");
359 }
360 }
361
362 return true;
363 }
364 }
365
366 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
367 return true;
368 }
369
370 return false;
371 }
372
373 private static Log _log = LogFactoryUtil.getLog(
374 ClusterRequestReceiver.class);
375
376 private ClusterExecutorImpl _clusterExecutorImpl;
377 private View _lastView;
378
379 }