1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
12   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
13   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
14   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
15   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
16   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
17   * SOFTWARE.
18   */
19  
20  package com.liferay.portal.kernel.messaging.sender;
21  
22  import com.liferay.portal.kernel.messaging.Message;
23  import com.liferay.portal.kernel.messaging.MessageBus;
24  import com.liferay.portal.kernel.messaging.MessageBusException;
25  import com.liferay.portal.kernel.messaging.MessageListener;
26  
27  /**
28   * <a href="SynchronousMessageListener.java.html"><b><i>View Source</i></b></a>
29   *
30   * @author Michael C. Han
31   *
32   */
33  public class SynchronousMessageListener implements MessageListener {
34  
35      public SynchronousMessageListener(
36          MessageBus messageBus, Message message, long timeout) {
37  
38          _messageBus = messageBus;
39          _message = message;
40          _timeout = timeout;
41          _responseId = _message.getResponseId();
42      }
43  
44      public Object getResults() {
45          return _results;
46      }
47  
48      public void receive(Message message) {
49          if (!message.getResponseId().equals(_responseId)) {
50              return;
51          }
52  
53          synchronized (this) {
54              _results = message.getPayload();
55  
56              notify();
57          }
58      }
59  
60      public Object send() throws MessageBusException {
61          String destination = _message.getDestination();
62          String responseDestination = _message.getResponseDestination();
63  
64          _messageBus.registerMessageListener(responseDestination, this);
65  
66          try {
67              synchronized (this) {
68                  _messageBus.sendMessage(destination, _message);
69  
70                  wait(_timeout);
71  
72                  if (_results == null) {
73                      throw new MessageBusException(
74                          "No reply received for message: " + _message);
75                  }
76              }
77  
78              return _results;
79          }
80          catch (InterruptedException ie) {
81              throw new MessageBusException(
82                  "Message sending interrupted for: " + _message, ie);
83          }
84          finally {
85              _messageBus.unregisterMessageListener(responseDestination, this);
86          }
87      }
88  
89      private MessageBus _messageBus;
90      private Message _message;
91      private long _timeout;
92      private String _responseId;
93      private Object _results;
94  
95  }