1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.cache.ehcache;
16  
17  import com.liferay.portal.cluster.BaseReceiver;
18  import com.liferay.portal.kernel.log.Log;
19  import com.liferay.portal.kernel.log.LogFactoryUtil;
20  
21  import java.io.Serializable;
22  
23  import java.rmi.RemoteException;
24  
25  import java.util.ArrayList;
26  import java.util.List;
27  
28  import net.sf.ehcache.Cache;
29  import net.sf.ehcache.CacheException;
30  import net.sf.ehcache.CacheManager;
31  import net.sf.ehcache.Ehcache;
32  import net.sf.ehcache.Element;
33  import net.sf.ehcache.distribution.CacheManagerPeerProvider;
34  import net.sf.ehcache.distribution.CachePeer;
35  import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
36  import net.sf.ehcache.distribution.jgroups.JGroupSerializable;
37  
38  import org.jgroups.Address;
39  import org.jgroups.JChannel;
40  import org.jgroups.Message;
41  
42  /**
43   * <a href="JGroupsManager.java.html"><b><i>View Source</i></b></a>
44   *
45   * <p>
46   * See http://issues.liferay.com/browse/LPS-11061.
47   * </p>
48   *
49   * @author Tina Tian
50   */
51  public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
52  
53      public JGroupsManager(
54          CacheManager cacheManager, String clusterName,
55          String channelProperties) {
56  
57          try {
58              _jChannel = new JChannel(channelProperties);
59  
60              _jChannel.setReceiver(new EhcacheJGroupsReceiver());
61  
62              _jChannel.connect(clusterName);
63  
64              if (_log.isInfoEnabled()) {
65                  _log.info(
66                      "Create a new channel with properties " +
67                          _jChannel.getProperties());
68              }
69          }
70          catch (Exception e) {
71              if (_log.isErrorEnabled()) {
72                  _log.error("Unable to initialize channels", e);
73              }
74          }
75  
76          _cacheManager = cacheManager;
77      }
78  
79      public void dispose() throws CacheException {
80          if (_jChannel != null) {
81              _jChannel.close();
82          }
83      }
84  
85      public Address getBusLocalAddress() {
86          return _jChannel.getAddress();
87      }
88  
89      public List<Address> getBusMembership() {
90          return _jChannel.getView().getMembers();
91      }
92  
93      @SuppressWarnings("rawtypes")
94      public List getElements(List list) {
95          return null;
96      }
97  
98      public String getGuid() {
99          return null;
100     }
101 
102     @SuppressWarnings("rawtypes")
103     public List getKeys() {
104         return null;
105     }
106 
107     public String getName() {
108         return null;
109     }
110 
111     public Element getQuiet(Serializable serializable) {
112         return null;
113     }
114 
115     public String getScheme() {
116         return _SCHEME;
117     }
118 
119     public long getTimeForClusterToForm() {
120         return 0;
121     }
122 
123     public String getUrl() {
124         return null;
125     }
126 
127     public String getUrlBase() {
128         return null;
129     }
130 
131     public void handleNotification(Serializable serializable) {
132         if (serializable instanceof JGroupSerializable) {
133             handleJGroupsNotification((JGroupSerializable)serializable);
134         }
135         else if (serializable instanceof List<?>) {
136             List<?> valueList = (List<?>)serializable;
137 
138             for (Object object : valueList) {
139                 if (object instanceof JGroupSerializable) {
140                     handleJGroupsNotification((JGroupSerializable)object);
141                 }
142             }
143         }
144     }
145 
146     public void init() {
147     }
148 
149     public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
150         List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
151 
152         cachePeers.add(this);
153 
154         return cachePeers;
155     }
156 
157     public void put(Element element) {
158     }
159 
160     public void registerPeer(String string) {
161     }
162 
163     public boolean remove(Serializable serializable) {
164         return false;
165     }
166 
167     public void removeAll() {
168     }
169 
170     @SuppressWarnings("rawtypes")
171     public void send(Address address, List eventMessages)
172         throws RemoteException {
173 
174         ArrayList<JGroupSerializable> jGroupSerializables =
175             new ArrayList<JGroupSerializable>();
176 
177         for (Object eventMessage : eventMessages) {
178             if (eventMessage instanceof JGroupEventMessage) {
179                 JGroupEventMessage jGroupEventMessage =
180                     (JGroupEventMessage)eventMessage;
181 
182                 JGroupSerializable jGroupSerializable = toJGroupSerializable(
183                     jGroupEventMessage);
184 
185                 jGroupSerializables.add(jGroupSerializable);
186             }
187             else {
188                 if (_log.isDebugEnabled()) {
189                     _log.debug(
190                         eventMessage + "is not a JGroupEventMessage type");
191                 }
192             }
193         }
194 
195         try {
196             _jChannel.send(address, null, jGroupSerializables);
197         }
198         catch (Throwable t) {
199             throw new RemoteException(t.getMessage());
200         }
201     }
202 
203     @SuppressWarnings("rawtypes")
204     public void send(List eventMessages) throws RemoteException {
205         send(null, eventMessages);
206     }
207 
208     public void unregisterPeer(String string) {
209     }
210 
211     protected void handleJGroupsNotification(
212         JGroupSerializable jGroupSerializable) {
213 
214         Cache cache = _cacheManager.getCache(
215             jGroupSerializable.getCacheName());
216 
217         if (cache == null) {
218             return;
219         }
220 
221         int event = jGroupSerializable.getEvent();
222         Serializable key = jGroupSerializable.getKey();
223         Serializable value = jGroupSerializable.getValue();
224 
225         if ((event == JGroupEventMessage.REMOVE) &&
226             (cache.getQuiet(key) != null)) {
227 
228             cache.remove(key, true);
229         }
230         else if (event == JGroupEventMessage.REMOVE_ALL) {
231             cache.removeAll(true);
232         }
233         else if (event == JGroupEventMessage.PUT) {
234             cache.put(new Element(key, value), true);
235         }
236     }
237 
238     protected JGroupSerializable toJGroupSerializable(
239         JGroupEventMessage jGroupEventMessage) {
240 
241         Element element = jGroupEventMessage.getElement();
242 
243         Serializable value = null;
244 
245         if (element != null) {
246             value = element.getValue();
247         }
248 
249         return new JGroupSerializable(
250             jGroupEventMessage.getEvent(),
251             jGroupEventMessage.getSerializableKey(), value,
252             jGroupEventMessage.getCacheName());
253     }
254 
255     private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
256 
257     private static final String _SCHEME = "JGroups";
258 
259     private CacheManager _cacheManager;
260     private JChannel _jChannel;
261 
262     private class EhcacheJGroupsReceiver extends BaseReceiver {
263 
264         public void receive(Message message) {
265             Object object = message.getObject();
266 
267             if (object == null) {
268                 if (_log.isWarnEnabled()) {
269                     _log.warn("Message content is null");
270                 }
271 
272                 return;
273             }
274 
275             if (object instanceof Serializable) {
276                 handleNotification((Serializable)object);
277             }
278             else {
279                 if (_log.isWarnEnabled()) {
280                     _log.warn(
281                         "Unable to process message content of type " +
282                             object.getClass().getName());
283                 }
284             }
285         }
286 
287     }
288 
289 }