1
14
15 package com.liferay.portal.cluster;
16
17 import com.liferay.portal.kernel.cluster.Address;
18 import com.liferay.portal.kernel.cluster.ClusterLink;
19 import com.liferay.portal.kernel.cluster.Priority;
20 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
21 import com.liferay.portal.kernel.log.Log;
22 import com.liferay.portal.kernel.log.LogFactoryUtil;
23 import com.liferay.portal.kernel.messaging.Message;
24 import com.liferay.portal.kernel.util.GetterUtil;
25 import com.liferay.portal.kernel.util.IPDetector;
26 import com.liferay.portal.kernel.util.OSDetector;
27 import com.liferay.portal.kernel.util.PropsKeys;
28 import com.liferay.portal.kernel.util.SocketUtil;
29 import com.liferay.portal.kernel.util.StringBundler;
30 import com.liferay.portal.kernel.util.StringPool;
31 import com.liferay.portal.kernel.util.Validator;
32 import com.liferay.portal.util.PropsUtil;
33 import com.liferay.portal.util.PropsValues;
34
35 import java.io.IOException;
36
37 import java.net.InetAddress;
38 import java.net.NetworkInterface;
39
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.List;
43 import java.util.Properties;
44 import java.util.Vector;
45
46 import org.jgroups.ChannelException;
47 import org.jgroups.JChannel;
48 import org.jgroups.Receiver;
49 import org.jgroups.View;
50
51
56 public class ClusterLinkImpl implements ClusterLink {
57
58 public void afterPropertiesSet() {
59 if (!PropsValues.CLUSTER_LINK_ENABLED) {
60 return;
61 }
62
63 if (OSDetector.isUnix() && IPDetector.isSupportsV6() &&
64 !IPDetector.isPrefersV4() && _log.isWarnEnabled()) {
65
66 StringBundler sb = new StringBundler(4);
67
68 sb.append("You are on an Unix server with IPv6 enabled. JGroups ");
69 sb.append("may not work with IPv6. If you see a multicast ");
70 sb.append("error, try adding java.net.preferIPv4Stack=true ");
71 sb.append("as a JVM startup parameter.");
72
73 _log.warn(sb.toString());
74 }
75
76 initSystemProperties();
77
78 try {
79 initBindAddress();
80 }
81 catch (IOException ioe) {
82 if (_log.isWarnEnabled()) {
83 _log.warn("Failed to initialize outgoing IP address", ioe);
84 }
85 }
86
87 try {
88 initChannels();
89 }
90 catch (Exception e) {
91 _log.error(e, e);
92 }
93 }
94
95 public void destroy() {
96 if (!PropsValues.CLUSTER_LINK_ENABLED) {
97 return;
98 }
99
100 for (JChannel channel : _transportChannels) {
101 channel.close();
102 }
103 }
104
105 public List<Address> getControlAddresses() {
106 if (!PropsValues.CLUSTER_LINK_ENABLED) {
107 return Collections.EMPTY_LIST;
108 }
109
110 return getAddresses(_controlChannel);
111 }
112
113 public Address getLocalControlAddress() {
114 if (!PropsValues.CLUSTER_LINK_ENABLED) {
115 return null;
116 }
117
118 return new AddressImpl(_controlChannel.getLocalAddress());
119 }
120
121 public List<Address> getLocalTransportAddresses() {
122 if (!PropsValues.CLUSTER_LINK_ENABLED) {
123 return Collections.EMPTY_LIST;
124 }
125
126 List<Address> addresses = new ArrayList<Address>(
127 _localTransportAddresses.size());
128
129 for (org.jgroups.Address address : _localTransportAddresses) {
130 addresses.add(new AddressImpl(address));
131 }
132
133 return addresses;
134 }
135
136 public List<Address> getTransportAddresses(Priority priority) {
137 if (!PropsValues.CLUSTER_LINK_ENABLED) {
138 return Collections.EMPTY_LIST;
139 }
140
141 JChannel channel = getChannel(priority);
142
143 return getAddresses(channel);
144 }
145
146 public boolean isEnabled() {
147 return PropsValues.CLUSTER_LINK_ENABLED;
148 }
149
150 public void sendMulticastMessage(Message message, Priority priority) {
151 if (!PropsValues.CLUSTER_LINK_ENABLED) {
152 return;
153 }
154
155 JChannel channel = getChannel(priority);
156
157 try {
158 channel.send(null, null, message);
159 }
160 catch (ChannelException ce) {
161 _log.error("Unable to send multicast message " + message, ce);
162 }
163 }
164
165 public void sendUnicastMessage(
166 Address address, Message message, Priority priority) {
167
168 if (!PropsValues.CLUSTER_LINK_ENABLED) {
169 return;
170 }
171
172 org.jgroups.Address jGroupsAddress =
173 (org.jgroups.Address)address.getRealAddress();
174
175 JChannel channel = getChannel(priority);
176
177 try {
178 channel.send(jGroupsAddress, null, message);
179 }
180 catch (ChannelException ce) {
181 _log.error("Unable to send unicast message:" + message, ce);
182 }
183 }
184
185 public void setClusterForwardMessageListener(
186 ClusterForwardMessageListener clusterForwardMessageListener) {
187
188 _clusterForwardMessageListener = clusterForwardMessageListener;
189 }
190
191 protected JChannel createChannel(
192 String properties, Receiver receiver, String clusterName)
193 throws ChannelException {
194
195 JChannel channel = new JChannel(properties);
196
197 channel.setReceiver(receiver);
198
199 channel.connect(clusterName);
200
201 if (_log.isInfoEnabled()) {
202 _log.info(
203 "Create a new channel with properties " +
204 channel.getProperties());
205 }
206
207 return channel;
208 }
209
210 protected List<Address> getAddresses(JChannel channel) {
211 View view = channel.getView();
212
213 Vector<org.jgroups.Address> jGroupsAddresses = view.getMembers();
214
215 if (jGroupsAddresses == null) {
216 return Collections.EMPTY_LIST;
217 }
218
219 List<Address> addresses = new ArrayList<Address>(
220 jGroupsAddresses.size());
221
222 for (org.jgroups.Address address : jGroupsAddresses) {
223 addresses.add(new AddressImpl(address));
224 }
225
226 return addresses;
227 }
228
229 protected JChannel getChannel(Priority priority) {
230 int channelIndex =
231 priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
232
233 if (_log.isDebugEnabled()) {
234 _log.debug(
235 "Select channel number " + channelIndex + " for priority " +
236 priority);
237 }
238
239 return _transportChannels.get(channelIndex);
240 }
241
242 protected void initBindAddress() throws IOException {
243 String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
244
245 if (Validator.isNull(autodetectAddress)) {
246 return;
247 }
248
249 String host = autodetectAddress;
250 int port = 80;
251
252 int index = autodetectAddress.indexOf(StringPool.COLON);
253
254 if (index != -1) {
255 host = autodetectAddress.substring(0, index);
256 port = GetterUtil.getInteger(
257 autodetectAddress.substring(index + 1), port);
258 }
259
260 if (_log.isInfoEnabled()) {
261 _log.info(
262 "Autodetecting JGroups outgoing IP address and interface for " +
263 host + ":" + port);
264 }
265
266 SocketUtil.BindInfo bindInfo = SocketUtil.getBindInfo(host, port);
267
268 InetAddress inetAddress = bindInfo.getInetAddress();
269 NetworkInterface networkInterface = bindInfo.getNetworkInterface();
270
271 System.setProperty("jgroups.bind_addr", inetAddress.getHostAddress());
272 System.setProperty(
273 "jgroups.bind_interface", networkInterface.getName());
274
275 if (_log.isInfoEnabled()) {
276 _log.info(
277 "Setting JGroups outgoing IP address to " +
278 inetAddress.getHostAddress() + " and interface to " +
279 networkInterface.getName());
280 }
281 }
282
283 protected void initChannels() throws ChannelException {
284 Properties controlProperty = PropsUtil.getProperties(
285 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
286
287 _controlChannel = createChannel(
288 controlProperty.getProperty(
289 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL),
290 new ClusterInvokeReceiver(_controlChannel),
291 _LIFERAY_CONTROL_CHANNEL);
292
293 Properties transportProperties = PropsUtil.getProperties(
294 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
295
296 _channelCount = transportProperties.size();
297
298 if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
299 throw new IllegalArgumentException(
300 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
301 }
302
303 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
304 _channelCount);
305 _transportChannels = new ArrayList<JChannel>(_channelCount);
306
307 List<String> keys = new ArrayList<String>(_channelCount);
308
309 for (Object key : transportProperties.keySet()) {
310 keys.add((String)key);
311 }
312
313 Collections.sort(keys);
314
315 for (int i = 0; i < keys.size(); i++) {
316 String customName = keys.get(i);
317
318 String value = transportProperties.getProperty(customName);
319
320 JChannel channel = createChannel(
321 value,
322 new ClusterForwardReceiver(
323 _localTransportAddresses, _clusterForwardMessageListener),
324 _LIFERAY_TRANSPORT_CHANNEL + i);
325
326 _localTransportAddresses.add(channel.getLocalAddress());
327 _transportChannels.add(channel);
328 }
329 }
330
331 protected void initSystemProperties() {
332 for (String systemProperty :
333 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
334
335 int index = systemProperty.indexOf(StringPool.COLON);
336
337 if (index == -1) {
338 continue;
339 }
340
341 String key = systemProperty.substring(0, index);
342 String value = systemProperty.substring(index + 1);
343
344 System.setProperty(key, value);
345
346 if (_log.isDebugEnabled()) {
347 _log.debug(
348 "Setting system property {key=" + key + ", value=" +
349 value + "}");
350 }
351 }
352 }
353
354 private static final String _LIFERAY_CONTROL_CHANNEL =
355 "LIFERAY-CONTROL-CHANNEL";
356
357 private static final String _LIFERAY_TRANSPORT_CHANNEL =
358 "LIFERAY-TRANSPORT-CHANNEL-";
359
360 private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
361
362 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
363
364 private int _channelCount;
365 private ClusterForwardMessageListener _clusterForwardMessageListener;
366 private JChannel _controlChannel;
367 private List<org.jgroups.Address> _localTransportAddresses;
368 private List<JChannel> _transportChannels;
369
370 }