001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036 import net.sf.ehcache.distribution.jgroups.JGroupSerializable;
037
038 import org.jgroups.Address;
039 import org.jgroups.JChannel;
040 import org.jgroups.Message;
041
042
049 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050
051 public JGroupsManager(
052 CacheManager cacheManager, String clusterName,
053 String channelProperties) {
054
055 try {
056 _jChannel = new JChannel(channelProperties);
057
058 _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059
060 _jChannel.connect(clusterName);
061
062 if (_log.isInfoEnabled()) {
063 _log.info(
064 "Create a new channel with properties " +
065 _jChannel.getProperties());
066 }
067 }
068 catch (Exception e) {
069 if (_log.isErrorEnabled()) {
070 _log.error("Unable to initialize channels", e);
071 }
072 }
073
074 _cacheManager = cacheManager;
075 }
076
077 public void dispose() throws CacheException {
078 if (_jChannel != null) {
079 _jChannel.close();
080 }
081 }
082
083 public Address getBusLocalAddress() {
084 return _jChannel.getAddress();
085 }
086
087 public List<Address> getBusMembership() {
088 return _jChannel.getView().getMembers();
089 }
090
091 @SuppressWarnings("rawtypes")
092 public List getElements(List list) {
093 return null;
094 }
095
096 public String getGuid() {
097 return null;
098 }
099
100 @SuppressWarnings("rawtypes")
101 public List getKeys() {
102 return null;
103 }
104
105 public String getName() {
106 return null;
107 }
108
109 public Element getQuiet(Serializable serializable) {
110 return null;
111 }
112
113 public String getScheme() {
114 return _SCHEME;
115 }
116
117 public long getTimeForClusterToForm() {
118 return 0;
119 }
120
121 public String getUrl() {
122 return null;
123 }
124
125 public String getUrlBase() {
126 return null;
127 }
128
129 public void handleNotification(Serializable serializable) {
130 if (serializable instanceof JGroupSerializable) {
131 handleJGroupsNotification((JGroupSerializable)serializable);
132 }
133 else if (serializable instanceof List<?>) {
134 List<?> valueList = (List<?>)serializable;
135
136 for (Object object : valueList) {
137 if (object instanceof JGroupSerializable) {
138 handleJGroupsNotification((JGroupSerializable)object);
139 }
140 }
141 }
142 }
143
144 public void init() {
145 }
146
147 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
148 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
149
150 cachePeers.add(this);
151
152 return cachePeers;
153 }
154
155 public void put(Element element) {
156 }
157
158 public void registerPeer(String string) {
159 }
160
161 public boolean remove(Serializable serializable) {
162 return false;
163 }
164
165 public void removeAll() {
166 }
167
168 @SuppressWarnings("rawtypes")
169 public void send(Address address, List eventMessages)
170 throws RemoteException {
171
172 ArrayList<JGroupSerializable> jGroupSerializables =
173 new ArrayList<JGroupSerializable>();
174
175 for (Object eventMessage : eventMessages) {
176 if (eventMessage instanceof JGroupEventMessage) {
177 JGroupEventMessage jGroupEventMessage =
178 (JGroupEventMessage)eventMessage;
179
180 JGroupSerializable jGroupSerializable = toJGroupSerializable(
181 jGroupEventMessage);
182
183 jGroupSerializables.add(jGroupSerializable);
184 }
185 else {
186 if (_log.isDebugEnabled()) {
187 _log.debug(
188 eventMessage + "is not a JGroupEventMessage type");
189 }
190 }
191 }
192
193 try {
194 _jChannel.send(address, null, jGroupSerializables);
195 }
196 catch (Throwable t) {
197 throw new RemoteException(t.getMessage());
198 }
199 }
200
201 @SuppressWarnings("rawtypes")
202 public void send(List eventMessages) throws RemoteException {
203 send(null, eventMessages);
204 }
205
206 public void unregisterPeer(String string) {
207 }
208
209 protected void handleJGroupsNotification(
210 JGroupSerializable jGroupSerializable) {
211
212 Cache cache = _cacheManager.getCache(
213 jGroupSerializable.getCacheName());
214
215 if (cache == null) {
216 return;
217 }
218
219 int event = jGroupSerializable.getEvent();
220 Serializable key = jGroupSerializable.getKey();
221 Serializable value = jGroupSerializable.getValue();
222
223 if ((event == JGroupEventMessage.REMOVE) &&
224 (cache.getQuiet(key) != null)) {
225
226 cache.remove(key, true);
227 }
228 else if (event == JGroupEventMessage.REMOVE_ALL) {
229 cache.removeAll(true);
230 }
231 else if (event == JGroupEventMessage.PUT) {
232 cache.put(new Element(key, value), true);
233 }
234 }
235
236 protected JGroupSerializable toJGroupSerializable(
237 JGroupEventMessage jGroupEventMessage) {
238
239 Element element = jGroupEventMessage.getElement();
240
241 Serializable value = null;
242
243 if (element != null) {
244 value = element.getValue();
245 }
246
247 return new JGroupSerializable(
248 jGroupEventMessage.getEvent(),
249 jGroupEventMessage.getSerializableKey(), value,
250 jGroupEventMessage.getCacheName());
251 }
252
253 private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
254
255 private static final String _SCHEME = "JGroups";
256
257 private CacheManager _cacheManager;
258 private JChannel _jChannel;
259
260 private class EhcacheJGroupsReceiver extends BaseReceiver {
261
262 public void receive(Message message) {
263 Object object = message.getObject();
264
265 if (object == null) {
266 if (_log.isWarnEnabled()) {
267 _log.warn("Message content is null");
268 }
269
270 return;
271 }
272
273 if (object instanceof Serializable) {
274 handleNotification((Serializable)object);
275 }
276 else {
277 if (_log.isWarnEnabled()) {
278 _log.warn(
279 "Unable to process message content of type " +
280 object.getClass().getName());
281 }
282 }
283 }
284
285 }
286
287 }