1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * This library is free software; you can redistribute it and/or modify it under
5    * the terms of the GNU Lesser General Public License as published by the Free
6    * Software Foundation; either version 2.1 of the License, or (at your option)
7    * any later version.
8    *
9    * This library is distributed in the hope that it will be useful, but WITHOUT
10   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11   * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12   * details.
13   */
14  
15  package com.liferay.portal.cluster;
16  
17  import com.liferay.portal.kernel.cluster.Address;
18  import com.liferay.portal.kernel.cluster.ClusterLink;
19  import com.liferay.portal.kernel.cluster.Priority;
20  import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
21  import com.liferay.portal.kernel.log.Log;
22  import com.liferay.portal.kernel.log.LogFactoryUtil;
23  import com.liferay.portal.kernel.messaging.Message;
24  import com.liferay.portal.kernel.util.GetterUtil;
25  import com.liferay.portal.kernel.util.IPDetector;
26  import com.liferay.portal.kernel.util.OSDetector;
27  import com.liferay.portal.kernel.util.PropsKeys;
28  import com.liferay.portal.kernel.util.SocketUtil;
29  import com.liferay.portal.kernel.util.StringBundler;
30  import com.liferay.portal.kernel.util.StringPool;
31  import com.liferay.portal.kernel.util.Validator;
32  import com.liferay.portal.util.PropsUtil;
33  import com.liferay.portal.util.PropsValues;
34  
35  import java.io.IOException;
36  
37  import java.net.InetAddress;
38  import java.net.NetworkInterface;
39  
40  import java.util.ArrayList;
41  import java.util.Collections;
42  import java.util.List;
43  import java.util.Properties;
44  import java.util.Vector;
45  
46  import org.jgroups.ChannelException;
47  import org.jgroups.JChannel;
48  import org.jgroups.Receiver;
49  import org.jgroups.View;
50  
51  /**
52   * <a href="ClusterLinkImpl.java.html"><b><i>View Source</i></b></a>
53   *
54   * @author Shuyang Zhou
55   */
56  public class ClusterLinkImpl implements ClusterLink {
57  
58      public void afterPropertiesSet() {
59          if (!PropsValues.CLUSTER_LINK_ENABLED) {
60              return;
61          }
62  
63          if (OSDetector.isUnix() && IPDetector.isSupportsV6() &&
64              !IPDetector.isPrefersV4() && _log.isWarnEnabled()) {
65  
66              StringBundler sb = new StringBundler(4);
67  
68              sb.append("You are on an Unix server with IPv6 enabled. JGroups ");
69              sb.append("may not work with IPv6. If you see a multicast ");
70              sb.append("error, try adding java.net.preferIPv4Stack=true ");
71              sb.append("as a JVM startup parameter.");
72  
73              _log.warn(sb.toString());
74          }
75  
76          initSystemProperties();
77  
78          try {
79              initBindAddress();
80          }
81          catch (IOException ioe) {
82              if (_log.isWarnEnabled()) {
83                  _log.warn("Failed to initialize outgoing IP address", ioe);
84              }
85          }
86  
87          try {
88              initChannels();
89          }
90          catch (Exception e) {
91              _log.error(e, e);
92          }
93      }
94  
95      public void destroy() {
96          if (!PropsValues.CLUSTER_LINK_ENABLED) {
97              return;
98          }
99  
100         for (JChannel channel : _transportChannels) {
101             channel.close();
102         }
103     }
104 
105     public List<Address> getControlAddresses() {
106         if (!PropsValues.CLUSTER_LINK_ENABLED) {
107             return Collections.EMPTY_LIST;
108         }
109 
110         return getAddresses(_controlChannel);
111     }
112 
113     public Address getLocalControlAddress() {
114         if (!PropsValues.CLUSTER_LINK_ENABLED) {
115             return null;
116         }
117 
118         return new AddressImpl(_controlChannel.getLocalAddress());
119     }
120 
121     public List<Address> getLocalTransportAddresses() {
122         if (!PropsValues.CLUSTER_LINK_ENABLED) {
123             return Collections.EMPTY_LIST;
124         }
125 
126         List<Address> addresses = new ArrayList<Address>(
127             _localTransportAddresses.size());
128 
129         for (org.jgroups.Address address : _localTransportAddresses) {
130             addresses.add(new AddressImpl(address));
131         }
132 
133         return addresses;
134     }
135 
136     public List<Address> getTransportAddresses(Priority priority) {
137         if (!PropsValues.CLUSTER_LINK_ENABLED) {
138             return Collections.EMPTY_LIST;
139         }
140 
141         JChannel channel = getChannel(priority);
142 
143         return getAddresses(channel);
144     }
145 
146     public boolean isEnabled() {
147         return PropsValues.CLUSTER_LINK_ENABLED;
148     }
149 
150     public void sendMulticastMessage(Message message, Priority priority) {
151         if (!PropsValues.CLUSTER_LINK_ENABLED) {
152             return;
153         }
154 
155         JChannel channel = getChannel(priority);
156 
157         try {
158             channel.send(null, null, message);
159         }
160         catch (ChannelException ce) {
161             _log.error("Unable to send multicast message " + message, ce);
162         }
163     }
164 
165     public void sendUnicastMessage(
166         Address address, Message message, Priority priority) {
167 
168         if (!PropsValues.CLUSTER_LINK_ENABLED) {
169             return;
170         }
171 
172         org.jgroups.Address jGroupsAddress =
173             (org.jgroups.Address)address.getRealAddress();
174 
175         JChannel channel = getChannel(priority);
176 
177         try {
178             channel.send(jGroupsAddress, null, message);
179         }
180         catch (ChannelException ce) {
181             _log.error("Unable to send unicast message:" + message, ce);
182         }
183     }
184 
185     public void setClusterForwardMessageListener(
186         ClusterForwardMessageListener clusterForwardMessageListener) {
187 
188         _clusterForwardMessageListener = clusterForwardMessageListener;
189     }
190 
191     protected JChannel createChannel(
192             String properties, Receiver receiver, String clusterName)
193         throws ChannelException {
194 
195         JChannel channel = new JChannel(properties);
196 
197         channel.setReceiver(receiver);
198 
199         channel.connect(clusterName);
200 
201         if (_log.isInfoEnabled()) {
202             _log.info(
203                 "Create a new channel with properties " +
204                     channel.getProperties());
205         }
206 
207         return channel;
208     }
209 
210     protected List<Address> getAddresses(JChannel channel) {
211         View view = channel.getView();
212 
213         Vector<org.jgroups.Address> jGroupsAddresses = view.getMembers();
214 
215         if (jGroupsAddresses == null) {
216             return Collections.EMPTY_LIST;
217         }
218 
219         List<Address> addresses = new ArrayList<Address>(
220             jGroupsAddresses.size());
221 
222         for (org.jgroups.Address address : jGroupsAddresses) {
223             addresses.add(new AddressImpl(address));
224         }
225 
226         return addresses;
227     }
228 
229     protected JChannel getChannel(Priority priority) {
230         int channelIndex =
231             priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
232 
233         if (_log.isDebugEnabled()) {
234             _log.debug(
235                 "Select channel number " + channelIndex + " for priority " +
236                     priority);
237         }
238 
239         return _transportChannels.get(channelIndex);
240     }
241 
242     protected void initBindAddress() throws IOException {
243         String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
244 
245         if (Validator.isNull(autodetectAddress)) {
246             return;
247         }
248 
249         String host = autodetectAddress;
250         int port = 80;
251 
252         int index = autodetectAddress.indexOf(StringPool.COLON);
253 
254         if (index != -1) {
255             host = autodetectAddress.substring(0, index);
256             port = GetterUtil.getInteger(
257                 autodetectAddress.substring(index + 1), port);
258         }
259 
260         if (_log.isInfoEnabled()) {
261             _log.info(
262                 "Autodetecting JGroups outgoing IP address and interface for " +
263                     host + ":" + port);
264         }
265 
266         SocketUtil.BindInfo bindInfo = SocketUtil.getBindInfo(host, port);
267 
268         InetAddress inetAddress = bindInfo.getInetAddress();
269         NetworkInterface networkInterface = bindInfo.getNetworkInterface();
270 
271         System.setProperty("jgroups.bind_addr", inetAddress.getHostAddress());
272         System.setProperty(
273             "jgroups.bind_interface", networkInterface.getName());
274 
275         if (_log.isInfoEnabled()) {
276             _log.info(
277                 "Setting JGroups outgoing IP address to " +
278                     inetAddress.getHostAddress() + " and interface to " +
279                         networkInterface.getName());
280         }
281     }
282 
283     protected void initChannels() throws ChannelException {
284         Properties controlProperty = PropsUtil.getProperties(
285             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
286 
287         _controlChannel = createChannel(
288             controlProperty.getProperty(
289                 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL),
290             new ClusterInvokeReceiver(_controlChannel),
291             _LIFERAY_CONTROL_CHANNEL);
292 
293         Properties transportProperties = PropsUtil.getProperties(
294             PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
295 
296         _channelCount = transportProperties.size();
297 
298         if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
299             throw new IllegalArgumentException(
300                 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
301         }
302 
303         _localTransportAddresses = new ArrayList<org.jgroups.Address>(
304             _channelCount);
305         _transportChannels = new ArrayList<JChannel>(_channelCount);
306 
307         List<String> keys = new ArrayList<String>(_channelCount);
308 
309         for (Object key : transportProperties.keySet()) {
310             keys.add((String)key);
311         }
312 
313         Collections.sort(keys);
314 
315         for (int i = 0; i < keys.size(); i++) {
316             String customName = keys.get(i);
317 
318             String value = transportProperties.getProperty(customName);
319 
320             JChannel channel = createChannel(
321                 value,
322                 new ClusterForwardReceiver(
323                     _localTransportAddresses, _clusterForwardMessageListener),
324                     _LIFERAY_TRANSPORT_CHANNEL + i);
325 
326             _localTransportAddresses.add(channel.getLocalAddress());
327             _transportChannels.add(channel);
328         }
329     }
330 
331     protected void initSystemProperties() {
332         for (String systemProperty :
333                 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
334 
335             int index = systemProperty.indexOf(StringPool.COLON);
336 
337             if (index == -1) {
338                 continue;
339             }
340 
341             String key = systemProperty.substring(0, index);
342             String value = systemProperty.substring(index + 1);
343 
344             System.setProperty(key, value);
345 
346             if (_log.isDebugEnabled()) {
347                 _log.debug(
348                     "Setting system property {key=" + key + ", value=" +
349                         value + "}");
350             }
351         }
352     }
353 
354     private static final String _LIFERAY_CONTROL_CHANNEL =
355         "LIFERAY-CONTROL-CHANNEL";
356 
357     private static final String _LIFERAY_TRANSPORT_CHANNEL =
358         "LIFERAY-TRANSPORT-CHANNEL-";
359 
360     private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
361 
362     private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
363 
364     private int _channelCount;
365     private ClusterForwardMessageListener _clusterForwardMessageListener;
366     private JChannel _controlChannel;
367     private List<org.jgroups.Address> _localTransportAddresses;
368     private List<JChannel> _transportChannels;
369 
370 }