1   /**
2    * Copyright (c) 2000-2009 Liferay, Inc. All rights reserved.
3    *
4    * Permission is hereby granted, free of charge, to any person obtaining a copy
5    * of this software and associated documentation files (the "Software"), to deal
6    * in the Software without restriction, including without limitation the rights
7    * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8    * copies of the Software, and to permit persons to whom the Software is
9    * furnished to do so, subject to the following conditions:
10   *
11   * The above copyright notice and this permission notice shall be included in
12   * all copies or substantial portions of the Software.
13   *
14   * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15   * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16   * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17   * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18   * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19   * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20   * SOFTWARE.
21   */
22  
23  package com.liferay.portal.cluster;
24  
25  import com.liferay.portal.kernel.cluster.Address;
26  import com.liferay.portal.kernel.cluster.ClusterLink;
27  import com.liferay.portal.kernel.cluster.Priority;
28  import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
29  import com.liferay.portal.kernel.log.Log;
30  import com.liferay.portal.kernel.log.LogFactoryUtil;
31  import com.liferay.portal.kernel.messaging.Message;
32  import com.liferay.portal.kernel.util.GetterUtil;
33  import com.liferay.portal.kernel.util.SocketUtil;
34  import com.liferay.portal.kernel.util.StringPool;
35  import com.liferay.portal.kernel.util.Validator;
36  import com.liferay.portal.util.PropsKeys;
37  import com.liferay.portal.util.PropsUtil;
38  import com.liferay.portal.util.PropsValues;
39  
40  import java.io.IOException;
41  
42  import java.util.ArrayList;
43  import java.util.Collections;
44  import java.util.List;
45  import java.util.Properties;
46  import java.util.Vector;
47  
48  import org.jgroups.ChannelException;
49  import org.jgroups.JChannel;
50  import org.jgroups.ReceiverAdapter;
51  import org.jgroups.View;
52  
53  /**
54   * <a href="ClusterLinkImpl.java.html"><b><i>View Source</i></b></a>
55   *
56   * @author Shuyang Zhou
57   */
58  public class ClusterLinkImpl implements ClusterLink {
59  
60      public void afterPropertiesSet() {
61          initSystemProperties();
62  
63          try {
64              initBindAddress();
65          }
66          catch (IOException ioe) {
67              if (_log.isWarnEnabled()) {
68                  _log.warn("Failed to initialize outgoing IP address", ioe);
69              }
70          }
71  
72          try {
73              initChannels();
74          }
75          catch (Exception e) {
76              _log.error(e, e);
77          }
78      }
79  
80      public void destory() {
81          for (JChannel channel : _channels) {
82              channel.close();
83          }
84      }
85  
86      public List<Address> getAddresses() {
87          Vector<org.jgroups.Address> jGroupsAddresses =
88              _channels.get(0).getView().getMembers();
89  
90          if (jGroupsAddresses == null) {
91              return new ArrayList<Address>();
92          }
93  
94          List<Address> addresses = new ArrayList<Address>(
95              jGroupsAddresses.size());
96  
97          for (org.jgroups.Address address : jGroupsAddresses) {
98              addresses.add(new AddressImpl(address));
99          }
100 
101         return addresses;
102     }
103 
104     public void sendMulticastMessage(Message message, Priority priority) {
105         JChannel channel = getChannel(priority);
106 
107         try {
108             channel.send(null, null, message);
109         }
110         catch (ChannelException ce) {
111             _log.error("Unable to send multicast message " + message, ce);
112         }
113     }
114 
115     public void sendUnicastMessage(
116         Address address, Message message, Priority priority) {
117 
118         org.jgroups.Address jGroupsAddress =
119             (org.jgroups.Address)address.getRealAddress();
120 
121         JChannel channel = getChannel(priority);
122 
123         try {
124             channel.send(jGroupsAddress, null, message);
125         }
126         catch (ChannelException ex) {
127             _log.error("Unable to send multicast message:" + message, ex);
128         }
129     }
130 
131     public void setClusterForwardMessageListener(
132         ClusterForwardMessageListener clusterForwardMessageListener) {
133 
134         _clusterForwardMessageListener = clusterForwardMessageListener;
135     }
136 
137     protected JChannel createChannel(int index, String properties)
138         throws ChannelException {
139 
140         JChannel channel = new JChannel(properties);
141 
142         channel.setReceiver(
143             new ReceiverAdapter() {
144 
145                 public void receive(org.jgroups.Message message) {
146                     if (!_addresses.contains(message.getSrc())) {
147                         _clusterForwardMessageListener.receive(
148                             (Message)message.getObject());
149                     }
150                     else {
151                         if (_log.isDebugEnabled()) {
152                             _log.debug("Block received message " + message);
153                         }
154                     }
155                 }
156 
157                 public void viewAccepted(View view) {
158                     if (_log.isDebugEnabled()) {
159                         _log.debug("Cluster link accepted view " + view);
160                     }
161                 }
162 
163             }
164         );
165 
166         channel.connect(_LIFERAY_CHANNEL + index);
167 
168         if (_log.isInfoEnabled()) {
169             _log.info(
170                 "Create a new channel with properties " +
171                     channel.getProperties());
172         }
173 
174         return channel;
175     }
176 
177     protected JChannel getChannel(Priority priority) {
178         int channelIndex =
179             priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
180 
181         if (_log.isDebugEnabled()) {
182             _log.debug(
183                 "Select channel number " + channelIndex + " for priority " +
184                     priority);
185         }
186 
187         return _channels.get(channelIndex);
188     }
189 
190     protected void initBindAddress() throws IOException {
191         String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
192 
193         if (Validator.isNull(autodetectAddress)) {
194             return;
195         }
196 
197         String host = autodetectAddress;
198         int port = 80;
199 
200         int index = autodetectAddress.indexOf(StringPool.COLON);
201 
202         if (index != -1) {
203             host = autodetectAddress.substring(0, index);
204             port = GetterUtil.getInteger(
205                 autodetectAddress.substring(index + 1), port);
206         }
207 
208         String bindAddress = SocketUtil.getHostAddress(host, port);
209 
210         System.setProperty("jgroups.bind_addr", bindAddress);
211 
212         if (_log.isInfoEnabled()) {
213             _log.info(
214                 "Set JGroups outgoing IP address to " + bindAddress + "}");
215         }
216     }
217 
218     protected void initChannels() throws ChannelException {
219         Properties properties = PropsUtil.getProperties(
220             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES, true);
221 
222         _channelCount = properties.size();
223 
224         if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
225             throw new IllegalArgumentException(
226                 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
227         }
228 
229         _addresses = new ArrayList<org.jgroups.Address>(_channelCount);
230         _channels = new ArrayList<JChannel>(_channelCount);
231 
232         List<String> keys = new ArrayList<String>(_channelCount);
233 
234         for (Object key : properties.keySet()) {
235             keys.add((String)key);
236         }
237 
238         Collections.sort(keys);
239 
240         for (int i = 0; i < keys.size(); i++) {
241             String customName = keys.get(i);
242 
243             String value = properties.getProperty(customName);
244 
245             JChannel channel = createChannel(i, value);
246 
247             _addresses.add(channel.getLocalAddress());
248             _channels.add(channel);
249         }
250     }
251 
252     protected void initSystemProperties() {
253         for (String systemProperty :
254                 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
255 
256             int index = systemProperty.indexOf(StringPool.COLON);
257 
258             if (index == -1) {
259                 continue;
260             }
261 
262             String key = systemProperty.substring(0, index);
263             String value = systemProperty.substring(index + 1);
264 
265             System.setProperty(key, value);
266 
267             if (_log.isDebugEnabled()) {
268                 _log.debug(
269                     "Setting system property {key=" + key + ", value=" +
270                         value + "}");
271             }
272         }
273     }
274 
275     private static final String _LIFERAY_CHANNEL = "LIFERAY-CHANNEL-";
276 
277     private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
278 
279     private static final Log _log =
280         LogFactoryUtil.getLog(ClusterLinkImpl.class);
281 
282     private List<org.jgroups.Address> _addresses;
283     private int _channelCount;
284     private List<JChannel> _channels;
285     private ClusterForwardMessageListener _clusterForwardMessageListener;
286 
287 }