1
14
15 package com.liferay.portal.increment;
16
17 import com.liferay.portal.kernel.concurrent.BatchablePipe;
18 import com.liferay.portal.kernel.log.Log;
19 import com.liferay.portal.kernel.log.LogFactoryUtil;
20 import com.liferay.portal.kernel.messaging.Message;
21 import com.liferay.portal.kernel.messaging.MessageListener;
22
23
29 public class BufferedIncrementMessageListener implements MessageListener {
30
31 public void receive(Message message) {
32 try {
33 doReceive(message);
34 }
35 catch (Exception e) {
36 _log.error("Unable to process message " + message, e);
37 }
38
39 }
40
41 @SuppressWarnings("rawtypes")
42 protected void doReceive(Message message) throws Exception {
43 BatchablePipe<String, BufferedIncreasableEntry> batchablePipe =
44 (BatchablePipe<String, BufferedIncreasableEntry>)
45 message.getPayload();
46
47 while (true) {
48 BufferedIncreasableEntry bufferedIncreasableEntry =
49 (BufferedIncreasableEntry)batchablePipe.take();
50
51 if (bufferedIncreasableEntry == null) {
52 break;
53 }
54
55 try {
56 bufferedIncreasableEntry.proceed();
57 }
58 catch (Throwable t) {
59 _log.error(
60 "Cannot write buffered increment value to the database", t);
61 }
62 }
63
64 }
65
66 private static Log _log = LogFactoryUtil.getLog(
67 BufferedIncrementMessageListener.class);
68
69 }