1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.NamedThreadFactory;
28
29 import java.util.List;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34
40 public abstract class BaseDestination implements Destination {
41
42 public BaseDestination(String name) {
43 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
44 }
45
46 public BaseDestination(
47 String name, int workersCoreSize, int workersMaxSize) {
48
49 _name = name;
50 _workersCoreSize = workersCoreSize;
51 _workersMaxSize = workersMaxSize;
52
53 open();
54 }
55
56 public synchronized void close() {
57 close(false);
58 }
59
60 public synchronized void close(boolean force) {
61 doClose(force);
62 }
63
64 public DestinationStatistics getStatistics() {
65 DestinationStatistics statistics = new DestinationStatistics();
66
67 statistics.setActiveThreadCount(_threadPoolExecutor.getActiveCount());
68 statistics.setCurrentThreadCount(_threadPoolExecutor.getPoolSize());
69 statistics.setLargestThreadCount(
70 _threadPoolExecutor.getLargestPoolSize());
71 statistics.setMaxThreadPoolSize(
72 _threadPoolExecutor.getMaximumPoolSize());
73 statistics.setMinThreadPoolSize(_threadPoolExecutor.getCorePoolSize());
74 statistics.setPendingMessageCount(
75 _threadPoolExecutor.getQueue().size());
76 statistics.setSentMessageCount(
77 _threadPoolExecutor.getCompletedTaskCount());
78
79 return statistics;
80 }
81
82 public abstract int getListenerCount();
83
84 public String getName() {
85 return _name;
86 }
87
88 public boolean isRegistered() {
89 if (getListenerCount() > 0) {
90 return true;
91 }
92 else {
93 return false;
94 }
95 }
96
97 public synchronized void open() {
98 doOpen();
99 }
100
101 protected void doClose(boolean force) {
102 if (!_threadPoolExecutor.isShutdown() &&
103 !_threadPoolExecutor.isTerminating()) {
104
105 if (!force) {
106 _threadPoolExecutor.shutdown();
107 }
108 else {
109 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
110
111 if (_log.isInfoEnabled()) {
112 _log.info(
113 "The following " + pendingTasks.size() + " tasks " +
114 "were not executed due to shutown: " +
115 pendingTasks);
116 }
117 }
118 }
119 }
120
121 protected void doOpen() {
122 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
123 _threadPoolExecutor = new ThreadPoolExecutor(
124 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
125 new LinkedBlockingQueue<Runnable>(),
126 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
127 }
128 }
129
130 protected ThreadPoolExecutor getThreadPoolExecutor() {
131 return _threadPoolExecutor;
132 }
133
134 protected int getWorkersCoreSize() {
135 return _workersCoreSize;
136 }
137
138 protected int getWorkersMaxSize() {
139 return _workersMaxSize;
140 }
141
142 private static final int _WORKERS_CORE_SIZE = 2;
143
144 private static final int _WORKERS_MAX_SIZE = 5;
145
146 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
147
148 private String _name;
149 private ThreadPoolExecutor _threadPoolExecutor;
150 private int _workersCoreSize;
151 private int _workersMaxSize;
152
153 }