1
14
15 package com.liferay.portal.kernel.messaging.sender;
16
17 import com.liferay.portal.kernel.messaging.Message;
18 import com.liferay.portal.kernel.messaging.MessageBus;
19 import com.liferay.portal.kernel.messaging.MessageBusException;
20 import com.liferay.portal.kernel.messaging.MessageListener;
21
22
27 public class SynchronousMessageListener implements MessageListener {
28
29 public SynchronousMessageListener(
30 MessageBus messageBus, Message message, long timeout) {
31
32 _messageBus = messageBus;
33 _message = message;
34 _timeout = timeout;
35 _responseId = _message.getResponseId();
36 }
37
38 public Object getResults() {
39 return _results;
40 }
41
42 public void receive(Message message) {
43 if (!message.getResponseId().equals(_responseId)) {
44 return;
45 }
46
47 synchronized (this) {
48 _results = message.getPayload();
49
50 notify();
51 }
52 }
53
54 public Object send() throws MessageBusException {
55 String destinationName = _message.getDestinationName();
56 String responseDestinationName = _message.getResponseDestinationName();
57
58 _messageBus.registerMessageListener(responseDestinationName, this);
59
60 try {
61 synchronized (this) {
62 _messageBus.sendMessage(destinationName, _message);
63
64 wait(_timeout);
65
66 if (_results == null) {
67 throw new MessageBusException(
68 "No reply received for message: " + _message);
69 }
70 }
71
72 return _results;
73 }
74 catch (InterruptedException ie) {
75 throw new MessageBusException(
76 "Message sending interrupted for: " + _message, ie);
77 }
78 finally {
79 _messageBus.unregisterMessageListener(
80 responseDestinationName, this);
81 }
82 }
83
84 private MessageBus _messageBus;
85 private Message _message;
86 private long _timeout;
87 private String _responseId;
88 private Object _results;
89
90 }