001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterLink;
019    import com.liferay.portal.kernel.cluster.Priority;
020    import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.util.PropsKeys;
025    import com.liferay.portal.util.PropsUtil;
026    import com.liferay.portal.util.PropsValues;
027    
028    import java.util.ArrayList;
029    import java.util.Collections;
030    import java.util.List;
031    import java.util.Properties;
032    
033    import org.jgroups.ChannelException;
034    import org.jgroups.JChannel;
035    
036    /**
037     * @author Shuyang Zhou
038     */
039    public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
040    
041            public void destroy() {
042                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
043                            return;
044                    }
045    
046                    for (JChannel jChannel : _transportChannels) {
047                            jChannel.close();
048                    }
049            }
050    
051            public List<Address> getLocalTransportAddresses() {
052                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
053                            return Collections.EMPTY_LIST;
054                    }
055    
056                    List<Address> addresses = new ArrayList<Address>(
057                            _localTransportAddresses.size());
058    
059                    for (org.jgroups.Address address : _localTransportAddresses) {
060                            addresses.add(new AddressImpl(address));
061                    }
062    
063                    return addresses;
064            }
065    
066            public List<Address> getTransportAddresses(Priority priority) {
067                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
068                            return Collections.EMPTY_LIST;
069                    }
070    
071                    JChannel jChannel = getChannel(priority);
072    
073                    return getAddresses(jChannel);
074            }
075    
076            public void sendMulticastMessage(Message message, Priority priority) {
077                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
078                            return;
079                    }
080    
081                    JChannel jChannel = getChannel(priority);
082    
083                    try {
084                            jChannel.send(null, null, message);
085                    }
086                    catch (ChannelException ce) {
087                            _log.error("Unable to send multicast message " + message, ce);
088                    }
089            }
090    
091            public void sendUnicastMessage(
092                    Address address, Message message, Priority priority) {
093    
094                    if (!PropsValues.CLUSTER_LINK_ENABLED) {
095                            return;
096                    }
097    
098                    org.jgroups.Address jGroupsAddress =
099                            (org.jgroups.Address)address.getRealAddress();
100    
101                    JChannel jChannel = getChannel(priority);
102    
103                    try {
104                            jChannel.send(jGroupsAddress, null, message);
105                    }
106                    catch (ChannelException ce) {
107                            _log.error("Unable to send unicast message:" + message, ce);
108                    }
109            }
110    
111            public void setClusterForwardMessageListener(
112                    ClusterForwardMessageListener clusterForwardMessageListener) {
113    
114                    _clusterForwardMessageListener = clusterForwardMessageListener;
115            }
116    
117            protected JChannel getChannel(Priority priority) {
118                    int channelIndex =
119                            priority.ordinal() * _channelCount / _MAX_CHANNEL_COUNT;
120    
121                    if (_log.isDebugEnabled()) {
122                            _log.debug(
123                                    "Select channel number " + channelIndex + " for priority " +
124                                            priority);
125                    }
126    
127                    return _transportChannels.get(channelIndex);
128            }
129    
130            protected void initChannels() throws ChannelException {
131                    Properties transportProperties = PropsUtil.getProperties(
132                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
133    
134                    _channelCount = transportProperties.size();
135    
136                    if ((_channelCount <= 0) || (_channelCount > _MAX_CHANNEL_COUNT)) {
137                            throw new IllegalArgumentException(
138                                    "Channel count must be between 1 and " + _MAX_CHANNEL_COUNT);
139                    }
140    
141                    _localTransportAddresses = new ArrayList<org.jgroups.Address>(
142                            _channelCount);
143                    _transportChannels = new ArrayList<JChannel>(_channelCount);
144    
145                    List<String> keys = new ArrayList<String>(_channelCount);
146    
147                    for (Object key : transportProperties.keySet()) {
148                            keys.add((String)key);
149                    }
150    
151                    Collections.sort(keys);
152    
153                    for (int i = 0; i < keys.size(); i++) {
154                            String customName = keys.get(i);
155    
156                            String value = transportProperties.getProperty(customName);
157    
158                            JChannel jChannel = createJChannel(
159                                    value,
160                                    new ClusterForwardReceiver(
161                                            _localTransportAddresses, _clusterForwardMessageListener),
162                                            _LIFERAY_TRANSPORT_CHANNEL + i);
163    
164                            _localTransportAddresses.add(jChannel.getLocalAddress());
165                            _transportChannels.add(jChannel);
166                    }
167            }
168    
169            private static final String _LIFERAY_TRANSPORT_CHANNEL =
170                    "LIFERAY-TRANSPORT-CHANNEL-";
171    
172            private static final int _MAX_CHANNEL_COUNT = Priority.values().length;
173    
174            private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
175    
176            private int _channelCount;
177            private ClusterForwardMessageListener _clusterForwardMessageListener;
178            private List<org.jgroups.Address> _localTransportAddresses;
179            private List<JChannel> _transportChannels;
180    
181    }