001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.cache.ehcache;
016    
017    import com.liferay.portal.cluster.BaseReceiver;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.io.Serializable;
022    
023    import java.rmi.RemoteException;
024    
025    import java.util.ArrayList;
026    import java.util.List;
027    
028    import net.sf.ehcache.Cache;
029    import net.sf.ehcache.CacheException;
030    import net.sf.ehcache.CacheManager;
031    import net.sf.ehcache.Ehcache;
032    import net.sf.ehcache.Element;
033    import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034    import net.sf.ehcache.distribution.CachePeer;
035    import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036    import net.sf.ehcache.distribution.jgroups.JGroupSerializable;
037    
038    import org.jgroups.Address;
039    import org.jgroups.JChannel;
040    import org.jgroups.Message;
041    
042    /**
043     * <p>
044     * See http://issues.liferay.com/browse/LPS-11061.
045     * </p>
046     *
047     * @author Tina Tian
048     */
049    public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050    
051            public JGroupsManager(
052                    CacheManager cacheManager, String clusterName,
053                    String channelProperties) {
054    
055                    try {
056                            _jChannel = new JChannel(channelProperties);
057    
058                            _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059    
060                            _jChannel.connect(clusterName);
061    
062                            if (_log.isInfoEnabled()) {
063                                    _log.info(
064                                            "Create a new channel with properties " +
065                                                    _jChannel.getProperties());
066                            }
067                    }
068                    catch (Exception e) {
069                            if (_log.isErrorEnabled()) {
070                                    _log.error("Unable to initialize channels", e);
071                            }
072                    }
073    
074                    _cacheManager = cacheManager;
075            }
076    
077            public void dispose() throws CacheException {
078                    if (_jChannel != null) {
079                            _jChannel.close();
080                    }
081            }
082    
083            public Address getBusLocalAddress() {
084                    return _jChannel.getAddress();
085            }
086    
087            public List<Address> getBusMembership() {
088                    return _jChannel.getView().getMembers();
089            }
090    
091            @SuppressWarnings("rawtypes")
092            public List getElements(List list) {
093                    return null;
094            }
095    
096            public String getGuid() {
097                    return null;
098            }
099    
100            @SuppressWarnings("rawtypes")
101            public List getKeys() {
102                    return null;
103            }
104    
105            public String getName() {
106                    return null;
107            }
108    
109            public Element getQuiet(Serializable serializable) {
110                    return null;
111            }
112    
113            public String getScheme() {
114                    return _SCHEME;
115            }
116    
117            public long getTimeForClusterToForm() {
118                    return 0;
119            }
120    
121            public String getUrl() {
122                    return null;
123            }
124    
125            public String getUrlBase() {
126                    return null;
127            }
128    
129            public void handleNotification(Serializable serializable) {
130                    if (serializable instanceof JGroupSerializable) {
131                            handleJGroupsNotification((JGroupSerializable)serializable);
132                    }
133                    else if (serializable instanceof List<?>) {
134                            List<?> valueList = (List<?>)serializable;
135    
136                            for (Object object : valueList) {
137                                    if (object instanceof JGroupSerializable) {
138                                            handleJGroupsNotification((JGroupSerializable)object);
139                                    }
140                            }
141                    }
142            }
143    
144            public void init() {
145            }
146    
147            public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
148                    List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
149    
150                    cachePeers.add(this);
151    
152                    return cachePeers;
153            }
154    
155            public void put(Element element) {
156            }
157    
158            public void registerPeer(String string) {
159            }
160    
161            public boolean remove(Serializable serializable) {
162                    return false;
163            }
164    
165            public void removeAll() {
166            }
167    
168            @SuppressWarnings("rawtypes")
169            public void send(Address address, List eventMessages)
170                    throws RemoteException {
171    
172                    ArrayList<JGroupSerializable> jGroupSerializables =
173                            new ArrayList<JGroupSerializable>();
174    
175                    for (Object eventMessage : eventMessages) {
176                            if (eventMessage instanceof JGroupEventMessage) {
177                                    JGroupEventMessage jGroupEventMessage =
178                                            (JGroupEventMessage)eventMessage;
179    
180                                    JGroupSerializable jGroupSerializable = toJGroupSerializable(
181                                            jGroupEventMessage);
182    
183                                    jGroupSerializables.add(jGroupSerializable);
184                            }
185                            else {
186                                    if (_log.isDebugEnabled()) {
187                                            _log.debug(
188                                                    eventMessage + "is not a JGroupEventMessage type");
189                                    }
190                            }
191                    }
192    
193                    try {
194                            _jChannel.send(address, null, jGroupSerializables);
195                    }
196                    catch (Throwable t) {
197                            throw new RemoteException(t.getMessage());
198                    }
199            }
200    
201            @SuppressWarnings("rawtypes")
202            public void send(List eventMessages) throws RemoteException {
203                    send(null, eventMessages);
204            }
205    
206            public void unregisterPeer(String string) {
207            }
208    
209            protected void handleJGroupsNotification(
210                    JGroupSerializable jGroupSerializable) {
211    
212                    Cache cache = _cacheManager.getCache(
213                            jGroupSerializable.getCacheName());
214    
215                    if (cache == null) {
216                            return;
217                    }
218    
219                    int event = jGroupSerializable.getEvent();
220                    Serializable key = jGroupSerializable.getKey();
221                    Serializable value = jGroupSerializable.getValue();
222    
223                    if ((event == JGroupEventMessage.REMOVE) &&
224                            (cache.getQuiet(key) != null)) {
225    
226                            cache.remove(key, true);
227                    }
228                    else if (event == JGroupEventMessage.REMOVE_ALL) {
229                            cache.removeAll(true);
230                    }
231                    else if (event == JGroupEventMessage.PUT) {
232                            cache.put(new Element(key, value), true);
233                    }
234            }
235    
236            protected JGroupSerializable toJGroupSerializable(
237                    JGroupEventMessage jGroupEventMessage) {
238    
239                    Element element = jGroupEventMessage.getElement();
240    
241                    Serializable value = null;
242    
243                    if (element != null) {
244                            value = element.getValue();
245                    }
246    
247                    return new JGroupSerializable(
248                            jGroupEventMessage.getEvent(),
249                            jGroupEventMessage.getSerializableKey(), value,
250                            jGroupEventMessage.getCacheName());
251            }
252    
253            private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
254    
255            private static final String _SCHEME = "JGroups";
256    
257            private CacheManager _cacheManager;
258            private JChannel _jChannel;
259    
260            private class EhcacheJGroupsReceiver extends BaseReceiver {
261    
262                    public void receive(Message message) {
263                            Object object = message.getObject();
264    
265                            if (object == null) {
266                                    if (_log.isWarnEnabled()) {
267                                            _log.warn("Message content is null");
268                                    }
269    
270                                    return;
271                            }
272    
273                            if (object instanceof Serializable) {
274                                    handleNotification((Serializable)object);
275                            }
276                            else {
277                                    if (_log.isWarnEnabled()) {
278                                            _log.warn(
279                                                    "Unable to process message content of type " +
280                                                            object.getClass().getName());
281                                    }
282                            }
283                    }
284    
285            }
286    
287    }