001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.util.PropsKeys;
025 import com.liferay.portal.util.PropsUtil;
026 import com.liferay.portal.util.PropsValues;
027
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.List;
031 import java.util.Properties;
032
033 import org.jgroups.ChannelException;
034 import org.jgroups.JChannel;
035
036
039 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
040
041 public void destroy() {
042 if (!PropsValues.CLUSTER_LINK_ENABLED) {
043 return;
044 }
045
046 for (JChannel jChannel : _transportChannels) {
047 jChannel.close();
048 }
049 }
050
051 public List<Address> getLocalTransportAddresses() {
052 if (!PropsValues.CLUSTER_LINK_ENABLED) {
053 return Collections.EMPTY_LIST;
054 }
055
056 List<Address> addresses = new ArrayList<Address>(
057 _localTransportAddresses.size());
058
059 for (org.jgroups.Address address : _localTransportAddresses) {
060 addresses.add(new AddressImpl(address));
061 }
062
063 return addresses;
064 }
065
066 public List<Address> getTransportAddresses(Priority priority) {
067 if (!PropsValues.CLUSTER_LINK_ENABLED) {
068 return Collections.EMPTY_LIST;
069 }
070
071 JChannel jChannel = getChannel(priority);
072
073 return getAddresses(jChannel);
074 }
075
076 public void sendMulticastMessage(Message message, Priority priority) {
077 if (!PropsValues.CLUSTER_LINK_ENABLED) {
078 return;
079 }
080
081 JChannel jChannel = getChannel(priority);
082
083 try {
084 jChannel.send(null, null, message);
085 }
086 catch (ChannelException ce) {
087 _log.error("Unable to send multicast message " + message, ce);
088 }
089 }
090
091 public void sendUnicastMessage(
092 Address address, Message message, Priority priority) {
093
094 if (!PropsValues.CLUSTER_LINK_ENABLED) {
095 return;
096 }
097
098 org.jgroups.Address jGroupsAddress =
099 (org.jgroups.Address)address.getRealAddress();
100
101 JChannel jChannel = getChannel(priority);
102
103 try {
104 jChannel.send(jGroupsAddress, null, message);
105 }
106 catch (ChannelException ce) {
107 _log.error("Unable to send unicast message:" + message, ce);
108 }
109 }
110
111 public void setClusterForwardMessageListener(
112 ClusterForwardMessageListener clusterForwardMessageListener) {
113
114 _clusterForwardMessageListener = clusterForwardMessageListener;
115 }
116
117 protected JChannel getChannel(Priority priority) {
118 int channelIndex =
119 priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
120
121 if (_log.isDebugEnabled()) {
122 _log.debug(
123 "Select channel number " + channelIndex + " for priority " +
124 priority);
125 }
126
127 return _transportChannels.get(channelIndex);
128 }
129
130 protected void initChannels() throws ChannelException {
131 Properties transportProperties = PropsUtil.getProperties(
132 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
133
134 _channelCount = transportProperties.size();
135
136 if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
137 throw new IllegalArgumentException(
138 "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
139 }
140
141 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
142 _channelCount);
143 _transportChannels = new ArrayList<JChannel>(_channelCount);
144
145 List<String> keys = new ArrayList<String>(_channelCount);
146
147 for (Object key : transportProperties.keySet()) {
148 keys.add((String)key);
149 }
150
151 Collections.sort(keys);
152
153 for (int i = 0; i < keys.size(); i++) {
154 String customName = keys.get(i);
155
156 String value = transportProperties.getProperty(customName);
157
158 JChannel jChannel = createJChannel(
159 value,
160 new ClusterForwardReceiver(
161 _localTransportAddresses, _clusterForwardMessageListener),
162 _LIFERAY_TRANSPORT_CHANNEL + i);
163
164 _localTransportAddresses.add(jChannel.getLocalAddress());
165 _transportChannels.add(jChannel);
166 }
167 }
168
169 private static final String _LIFERAY_TRANSPORT_CHANNEL =
170 "LIFERAY-TRANSPORT-CHANNEL-";
171
172 private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
173
174 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
175
176 private int _channelCount;
177 private ClusterForwardMessageListener _clusterForwardMessageListener;
178 private List<org.jgroups.Address> _localTransportAddresses;
179 private List<JChannel> _transportChannels;
180
181 }