1
22
23 package com.liferay.portal.kernel.messaging.sender;
24
25 import com.liferay.portal.kernel.messaging.Message;
26 import com.liferay.portal.kernel.messaging.MessageBus;
27 import com.liferay.portal.kernel.messaging.MessageBusException;
28 import com.liferay.portal.kernel.messaging.MessageListener;
29
30
35 public class SynchronousMessageListener implements MessageListener {
36
37 public SynchronousMessageListener(
38 MessageBus messageBus, Message message, long timeout) {
39
40 _messageBus = messageBus;
41 _message = message;
42 _timeout = timeout;
43 _responseId = _message.getResponseId();
44 }
45
46 public Object getResults() {
47 return _results;
48 }
49
50 public void receive(Message message) {
51 if (!message.getResponseId().equals(_responseId)) {
52 return;
53 }
54
55 synchronized (this) {
56 _results = message.getPayload();
57
58 notify();
59 }
60 }
61
62 public Object send() throws MessageBusException {
63 String destinationName = _message.getDestinationName();
64 String responseDestinationName = _message.getResponseDestinationName();
65
66 _messageBus.registerMessageListener(responseDestinationName, this);
67
68 try {
69 synchronized (this) {
70 _messageBus.sendMessage(destinationName, _message);
71
72 wait(_timeout);
73
74 if (_results == null) {
75 throw new MessageBusException(
76 "No reply received for message: " + _message);
77 }
78 }
79
80 return _results;
81 }
82 catch (InterruptedException ie) {
83 throw new MessageBusException(
84 "Message sending interrupted for: " + _message, ie);
85 }
86 finally {
87 _messageBus.unregisterMessageListener(
88 responseDestinationName, this);
89 }
90 }
91
92 private MessageBus _messageBus;
93 private Message _message;
94 private long _timeout;
95 private String _responseId;
96 private Object _results;
97
98 }