1
14
15 package com.liferay.portal.cache.ehcache;
16
17 import com.liferay.portal.cluster.BaseReceiver;
18 import com.liferay.portal.kernel.log.Log;
19 import com.liferay.portal.kernel.log.LogFactoryUtil;
20
21 import java.io.Serializable;
22
23 import java.rmi.RemoteException;
24
25 import java.util.ArrayList;
26 import java.util.List;
27
28 import net.sf.ehcache.Cache;
29 import net.sf.ehcache.CacheException;
30 import net.sf.ehcache.CacheManager;
31 import net.sf.ehcache.Ehcache;
32 import net.sf.ehcache.Element;
33 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
34 import net.sf.ehcache.distribution.CachePeer;
35 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
36 import net.sf.ehcache.distribution.jgroups.JGroupSerializable;
37
38 import org.jgroups.Address;
39 import org.jgroups.JChannel;
40 import org.jgroups.Message;
41
42
51 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
52
53 public JGroupsManager(
54 CacheManager cacheManager, String clusterName,
55 String channelProperties) {
56
57 try {
58 _jChannel = new JChannel(channelProperties);
59
60 _jChannel.setReceiver(new EhcacheJGroupsReceiver());
61
62 _jChannel.connect(clusterName);
63
64 if (_log.isInfoEnabled()) {
65 _log.info(
66 "Create a new channel with properties " +
67 _jChannel.getProperties());
68 }
69 }
70 catch (Exception e) {
71 if (_log.isErrorEnabled()) {
72 _log.error("Unable to initialize channels", e);
73 }
74 }
75
76 _cacheManager = cacheManager;
77 }
78
79 public void dispose() throws CacheException {
80 if (_jChannel != null) {
81 _jChannel.close();
82 }
83 }
84
85 public Address getBusLocalAddress() {
86 return _jChannel.getAddress();
87 }
88
89 public List<Address> getBusMembership() {
90 return _jChannel.getView().getMembers();
91 }
92
93 @SuppressWarnings("rawtypes")
94 public List getElements(List list) {
95 return null;
96 }
97
98 public String getGuid() {
99 return null;
100 }
101
102 @SuppressWarnings("rawtypes")
103 public List getKeys() {
104 return null;
105 }
106
107 public String getName() {
108 return null;
109 }
110
111 public Element getQuiet(Serializable serializable) {
112 return null;
113 }
114
115 public String getScheme() {
116 return _SCHEME;
117 }
118
119 public long getTimeForClusterToForm() {
120 return 0;
121 }
122
123 public String getUrl() {
124 return null;
125 }
126
127 public String getUrlBase() {
128 return null;
129 }
130
131 public void handleNotification(Serializable serializable) {
132 if (serializable instanceof JGroupSerializable) {
133 handleJGroupsNotification((JGroupSerializable)serializable);
134 }
135 else if (serializable instanceof List<?>) {
136 List<?> valueList = (List<?>)serializable;
137
138 for (Object object : valueList) {
139 if (object instanceof JGroupSerializable) {
140 handleJGroupsNotification((JGroupSerializable)object);
141 }
142 }
143 }
144 }
145
146 public void init() {
147 }
148
149 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
150 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
151
152 cachePeers.add(this);
153
154 return cachePeers;
155 }
156
157 public void put(Element element) {
158 }
159
160 public void registerPeer(String string) {
161 }
162
163 public boolean remove(Serializable serializable) {
164 return false;
165 }
166
167 public void removeAll() {
168 }
169
170 @SuppressWarnings("rawtypes")
171 public void send(Address address, List eventMessages)
172 throws RemoteException {
173
174 ArrayList<JGroupSerializable> jGroupSerializables =
175 new ArrayList<JGroupSerializable>();
176
177 for (Object eventMessage : eventMessages) {
178 if (eventMessage instanceof JGroupEventMessage) {
179 JGroupEventMessage jGroupEventMessage =
180 (JGroupEventMessage)eventMessage;
181
182 JGroupSerializable jGroupSerializable = toJGroupSerializable(
183 jGroupEventMessage);
184
185 jGroupSerializables.add(jGroupSerializable);
186 }
187 else {
188 if (_log.isDebugEnabled()) {
189 _log.debug(
190 eventMessage + "is not a JGroupEventMessage type");
191 }
192 }
193 }
194
195 try {
196 _jChannel.send(address, null, jGroupSerializables);
197 }
198 catch (Throwable t) {
199 throw new RemoteException(t.getMessage());
200 }
201 }
202
203 @SuppressWarnings("rawtypes")
204 public void send(List eventMessages) throws RemoteException {
205 send(null, eventMessages);
206 }
207
208 public void unregisterPeer(String string) {
209 }
210
211 protected void handleJGroupsNotification(
212 JGroupSerializable jGroupSerializable) {
213
214 Cache cache = _cacheManager.getCache(
215 jGroupSerializable.getCacheName());
216
217 if (cache == null) {
218 return;
219 }
220
221 int event = jGroupSerializable.getEvent();
222 Serializable key = jGroupSerializable.getKey();
223 Serializable value = jGroupSerializable.getValue();
224
225 if ((event == JGroupEventMessage.REMOVE) &&
226 (cache.getQuiet(key) != null)) {
227
228 cache.remove(key, true);
229 }
230 else if (event == JGroupEventMessage.REMOVE_ALL) {
231 cache.removeAll(true);
232 }
233 else if (event == JGroupEventMessage.PUT) {
234 cache.put(new Element(key, value), true);
235 }
236 }
237
238 protected JGroupSerializable toJGroupSerializable(
239 JGroupEventMessage jGroupEventMessage) {
240
241 Element element = jGroupEventMessage.getElement();
242
243 Serializable value = null;
244
245 if (element != null) {
246 value = element.getValue();
247 }
248
249 return new JGroupSerializable(
250 jGroupEventMessage.getEvent(),
251 jGroupEventMessage.getSerializableKey(), value,
252 jGroupEventMessage.getCacheName());
253 }
254
255 private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
256
257 private static final String _SCHEME = "JGroups";
258
259 private CacheManager _cacheManager;
260 private JChannel _jChannel;
261
262 private class EhcacheJGroupsReceiver extends BaseReceiver {
263
264 public void receive(Message message) {
265 Object object = message.getObject();
266
267 if (object == null) {
268 if (_log.isWarnEnabled()) {
269 _log.warn("Message content is null");
270 }
271
272 return;
273 }
274
275 if (object instanceof Serializable) {
276 handleNotification((Serializable)object);
277 }
278 else {
279 if (_log.isWarnEnabled()) {
280 _log.warn(
281 "Unable to process message content of type " +
282 object.getClass().getName());
283 }
284 }
285 }
286
287 }
288
289 }