1
22
23 package com.liferay.portal.cluster;
24
25 import com.liferay.portal.kernel.cluster.Address;
26 import com.liferay.portal.kernel.cluster.ClusterLink;
27 import com.liferay.portal.kernel.cluster.Priority;
28 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
29 import com.liferay.portal.kernel.log.Log;
30 import com.liferay.portal.kernel.log.LogFactoryUtil;
31 import com.liferay.portal.kernel.messaging.Message;
32 import com.liferay.portal.kernel.util.GetterUtil;
33 import com.liferay.portal.kernel.util.SocketUtil;
34 import com.liferay.portal.kernel.util.StringPool;
35 import com.liferay.portal.kernel.util.Validator;
36 import com.liferay.portal.util.PropsKeys;
37 import com.liferay.portal.util.PropsUtil;
38 import com.liferay.portal.util.PropsValues;
39
40 import java.io.IOException;
41
42 import java.util.ArrayList;
43 import java.util.Collections;
44 import java.util.List;
45 import java.util.Properties;
46 import java.util.Vector;
47
48 import org.jgroups.ChannelException;
49 import org.jgroups.JChannel;
50 import org.jgroups.ReceiverAdapter;
51 import org.jgroups.View;
52
53
58 public class ClusterLinkImpl implements ClusterLink {
59
60 public void afterPropertiesSet() {
61 initSystemProperties();
62
63 try {
64 initBindAddress();
65 }
66 catch (IOException ioe) {
67 if (_log.isWarnEnabled()) {
68 _log.warn("Failed to initialize outgoing IP address", ioe);
69 }
70 }
71
72 try {
73 initChannels();
74 }
75 catch (Exception e) {
76 _log.error(e, e);
77 }
78 }
79
80 public void destory() {
81 for (JChannel channel : _channels) {
82 channel.close();
83 }
84 }
85
86 public List<Address> getAddresses() {
87 Vector<org.jgroups.Address> jGroupsAddresses =
88 _channels.get(0).getView().getMembers();
89
90 if (jGroupsAddresses == null) {
91 return new ArrayList<Address>();
92 }
93
94 List<Address> addresses = new ArrayList<Address>(
95 jGroupsAddresses.size());
96
97 for (org.jgroups.Address address : jGroupsAddresses) {
98 addresses.add(new AddressImpl(address));
99 }
100
101 return addresses;
102 }
103
104 public void sendMulticastMessage(Message message, Priority priority) {
105 JChannel channel = getChannel(priority);
106
107 try {
108 channel.send(null, null, message);
109 }
110 catch (ChannelException ce) {
111 _log.error("Unable to send multicast message " + message, ce);
112 }
113 }
114
115 public void sendUnicastMessage(
116 Address address, Message message, Priority priority) {
117
118 org.jgroups.Address jGroupsAddress =
119 (org.jgroups.Address)address.getRealAddress();
120
121 JChannel channel = getChannel(priority);
122
123 try {
124 channel.send(jGroupsAddress, null, message);
125 }
126 catch (ChannelException ex) {
127 _log.error("Unable to send multicast message:" + message, ex);
128 }
129 }
130
131 public void setClusterForwardMessageListener(
132 ClusterForwardMessageListener clusterForwardMessageListener) {
133
134 _clusterForwardMessageListener = clusterForwardMessageListener;
135 }
136
137 protected JChannel createChannel(int index, String properties)
138 throws ChannelException {
139
140 JChannel channel = new JChannel(properties);
141
142 channel.setReceiver(
143 new ReceiverAdapter() {
144
145 public void receive(org.jgroups.Message message) {
146 if (!_addresses.contains(message.getSrc())) {
147 _clusterForwardMessageListener.receive(
148 (Message)message.getObject());
149 }
150 else {
151 if (_log.isDebugEnabled()) {
152 _log.debug("Block received message " + message);
153 }
154 }
155 }
156
157 public void viewAccepted(View view) {
158 if (_log.isDebugEnabled()) {
159 _log.debug("Cluster link accepted view " + view);
160 }
161 }
162
163 }
164 );
165
166 channel.connect(_LIFERAY_CHANNEL + index);
167
168 if (_log.isInfoEnabled()) {
169 _log.info(
170 "Create a new channel with properties " +
171 channel.getProperties());
172 }
173
174 return channel;
175 }
176
177 protected JChannel getChannel(Priority priority) {
178 int channelIndex =
179 priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
180
181 if (_log.isDebugEnabled()) {
182 _log.debug(
183 "Select channel number " + channelIndex + " for priority " +
184 priority);
185 }
186
187 return _channels.get(channelIndex);
188 }
189
190 protected void initBindAddress() throws IOException {
191 String autodetectAddress = PropsValues.CLUSTER_LINK_AUTODETECT_ADDRESS;
192
193 if (Validator.isNull(autodetectAddress)) {
194 return;
195 }
196
197 String host = autodetectAddress;
198 int port = 80;
199
200 int index = autodetectAddress.indexOf(StringPool.COLON);
201
202 if (index != -1) {
203 host = autodetectAddress.substring(0, index);
204 port = GetterUtil.getInteger(
205 autodetectAddress.substring(index + 1), port);
206 }
207
208 String bindAddress = SocketUtil.getHostAddress(host, port);
209
210 System.setProperty("jgroups.bind_addr", bindAddress);
211
212 if (_log.isInfoEnabled()) {
213 _log.info(
214 "Set JGroups outgoing IP address to " + bindAddress + "}");
215 }
216 }
217
218 protected void initChannels() throws ChannelException {
219 Properties properties = PropsUtil.getProperties(
220 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES, true);
221
222 _channelCount = properties.size();
223
224 if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
225 throw new IllegalArgumentException(
226 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
227 }
228
229 _addresses = new ArrayList<org.jgroups.Address>(_channelCount);
230 _channels = new ArrayList<JChannel>(_channelCount);
231
232 List<String> keys = new ArrayList<String>(_channelCount);
233
234 for (Object key : properties.keySet()) {
235 keys.add((String)key);
236 }
237
238 Collections.sort(keys);
239
240 for (int i = 0; i < keys.size(); i++) {
241 String customName = keys.get(i);
242
243 String value = properties.getProperty(customName);
244
245 JChannel channel = createChannel(i, value);
246
247 _addresses.add(channel.getLocalAddress());
248 _channels.add(channel);
249 }
250 }
251
252 protected void initSystemProperties() {
253 for (String systemProperty :
254 PropsValues.CLUSTER_LINK_CHANNEL_SYSTEM_PROPERTIES) {
255
256 int index = systemProperty.indexOf(StringPool.COLON);
257
258 if (index == -1) {
259 continue;
260 }
261
262 String key = systemProperty.substring(0, index);
263 String value = systemProperty.substring(index + 1);
264
265 System.setProperty(key, value);
266
267 if (_log.isDebugEnabled()) {
268 _log.debug(
269 "Setting system property {key=" + key + ", value=" +
270 value + "}");
271 }
272 }
273 }
274
275 private static final String _LIFERAY_CHANNEL = "LIFERAY-CHANNEL-";
276
277 private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
278
279 private static final Log _log =
280 LogFactoryUtil.getLog(ClusterLinkImpl.class);
281
282 private List<org.jgroups.Address> _addresses;
283 private int _channelCount;
284 private List<JChannel> _channels;
285 private ClusterForwardMessageListener _clusterForwardMessageListener;
286
287 }