1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.cluster;
16  
17  import com.liferay.portal.kernel.cluster.Address;
18  import com.liferay.portal.kernel.cluster.ClusterLink;
19  import com.liferay.portal.kernel.cluster.Priority;
20  import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
21  import com.liferay.portal.kernel.log.Log;
22  import com.liferay.portal.kernel.log.LogFactoryUtil;
23  import com.liferay.portal.kernel.messaging.Message;
24  import com.liferay.portal.kernel.util.PropsKeys;
25  import com.liferay.portal.util.PropsUtil;
26  import com.liferay.portal.util.PropsValues;
27  
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.List;
31  import java.util.Properties;
32  
33  import org.jgroups.ChannelException;
34  import org.jgroups.JChannel;
35  
36  /**
37   * <a href="ClusterLinkImpl.java.html"><b><i>View Source</i></b></a>
38   *
39   * @author Shuyang Zhou
40   */
41  public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
42  
43      public void destroy() {
44          if (!PropsValues.CLUSTER_LINK_ENABLED) {
45              return;
46          }
47  
48          for (JChannel jChannel : _transportChannels) {
49              jChannel.close();
50          }
51      }
52  
53      public List<Address> getLocalTransportAddresses() {
54          if (!PropsValues.CLUSTER_LINK_ENABLED) {
55              return Collections.EMPTY_LIST;
56          }
57  
58          List<Address> addresses = new ArrayList<Address>(
59              _localTransportAddresses.size());
60  
61          for (org.jgroups.Address address : _localTransportAddresses) {
62              addresses.add(new AddressImpl(address));
63          }
64  
65          return addresses;
66      }
67  
68      public List<Address> getTransportAddresses(Priority priority) {
69          if (!PropsValues.CLUSTER_LINK_ENABLED) {
70              return Collections.EMPTY_LIST;
71          }
72  
73          JChannel jChannel = getChannel(priority);
74  
75          return getAddresses(jChannel);
76      }
77  
78      public void sendMulticastMessage(Message message, Priority priority) {
79          if (!PropsValues.CLUSTER_LINK_ENABLED) {
80              return;
81          }
82  
83          JChannel jChannel = getChannel(priority);
84  
85          try {
86              jChannel.send(null, null, message);
87          }
88          catch (ChannelException ce) {
89              _log.error("Unable to send multicast message " + message, ce);
90          }
91      }
92  
93      public void sendUnicastMessage(
94          Address address, Message message, Priority priority) {
95  
96          if (!PropsValues.CLUSTER_LINK_ENABLED) {
97              return;
98          }
99  
100         org.jgroups.Address jGroupsAddress =
101             (org.jgroups.Address)address.getRealAddress();
102 
103         JChannel jChannel = getChannel(priority);
104 
105         try {
106             jChannel.send(jGroupsAddress, null, message);
107         }
108         catch (ChannelException ce) {
109             _log.error("Unable to send unicast message:" + message, ce);
110         }
111     }
112 
113     public void setClusterForwardMessageListener(
114         ClusterForwardMessageListener clusterForwardMessageListener) {
115 
116         _clusterForwardMessageListener = clusterForwardMessageListener;
117     }
118 
119     protected JChannel getChannel(Priority priority) {
120         int channelIndex =
121             priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
122 
123         if (_log.isDebugEnabled()) {
124             _log.debug(
125                 "Select channel number " + channelIndex + " for priority " +
126                     priority);
127         }
128 
129         return _transportChannels.get(channelIndex);
130     }
131 
132     protected void initChannels() throws ChannelException {
133         Properties transportProperties = PropsUtil.getProperties(
134             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
135 
136         _channelCount = transportProperties.size();
137 
138         if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
139             throw new IllegalArgumentException(
140                 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
141         }
142 
143         _localTransportAddresses = new ArrayList<org.jgroups.Address>(
144             _channelCount);
145         _transportChannels = new ArrayList<JChannel>(_channelCount);
146 
147         List<String> keys = new ArrayList<String>(_channelCount);
148 
149         for (Object key : transportProperties.keySet()) {
150             keys.add((String)key);
151         }
152 
153         Collections.sort(keys);
154 
155         for (int i = 0; i < keys.size(); i++) {
156             String customName = keys.get(i);
157 
158             String value = transportProperties.getProperty(customName);
159 
160             JChannel jChannel = createJChannel(
161                 value,
162                 new ClusterForwardReceiver(
163                     _localTransportAddresses, _clusterForwardMessageListener),
164                     _LIFERAY_TRANSPORT_CHANNEL + i);
165 
166             _localTransportAddresses.add(jChannel.getLocalAddress());
167             _transportChannels.add(jChannel);
168         }
169     }
170 
171     private static final String _LIFERAY_TRANSPORT_CHANNEL =
172         "LIFERAY-TRANSPORT-CHANNEL-";
173 
174     private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
175 
176     private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
177 
178     private int _channelCount;
179     private ClusterForwardMessageListener _clusterForwardMessageListener;
180     private List<org.jgroups.Address> _localTransportAddresses;
181     private List<JChannel> _transportChannels;
182 
183 }