1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    *
5    *
6    *
7    * The contents of this file are subject to the terms of the Liferay Enterprise
8    * Subscription License ("License"). You may not use this file except in
9    * compliance with the License. You can obtain a copy of the License by
10   * contacting Liferay, Inc. See the License for the specific language governing
11   * permissions and limitations under the License, including but not limited to
12   * distribution rights of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.kernel.messaging.sender;
24  
25  import com.liferay.portal.kernel.messaging.Message;
26  import com.liferay.portal.kernel.messaging.MessageBus;
27  import com.liferay.portal.kernel.messaging.MessageBusException;
28  import com.liferay.portal.kernel.messaging.MessageListener;
29  
30  /**
31   * <a href="SynchronousMessageListener.java.html"><b><i>View Source</i></b></a>
32   *
33   * @author Michael C. Han
34   */
35  public class SynchronousMessageListener implements MessageListener {
36  
37      public SynchronousMessageListener(
38          MessageBus messageBus, Message message, long timeout) {
39  
40          _messageBus = messageBus;
41          _message = message;
42          _timeout = timeout;
43          _responseId = _message.getResponseId();
44      }
45  
46      public Object getResults() {
47          return _results;
48      }
49  
50      public void receive(Message message) {
51          if (!message.getResponseId().equals(_responseId)) {
52              return;
53          }
54  
55          synchronized (this) {
56              _results = message.getPayload();
57  
58              notify();
59          }
60      }
61  
62      public Object send() throws MessageBusException {
63          String destinationName = _message.getDestinationName();
64          String responseDestinationName = _message.getResponseDestinationName();
65  
66          _messageBus.registerMessageListener(responseDestinationName, this);
67  
68          try {
69              synchronized (this) {
70                  _messageBus.sendMessage(destinationName, _message);
71  
72                  wait(_timeout);
73  
74                  if (_results == null) {
75                      throw new MessageBusException(
76                          "No reply received for message: " + _message);
77                  }
78              }
79  
80              return _results;
81          }
82          catch (InterruptedException ie) {
83              throw new MessageBusException(
84                  "Message sending interrupted for: " + _message, ie);
85          }
86          finally {
87              _messageBus.unregisterMessageListener(
88                  responseDestinationName, this);
89          }
90      }
91  
92      private MessageBus _messageBus;
93      private Message _message;
94      private long _timeout;
95      private String _responseId;
96      private Object _results;
97  
98  }