1
19
20 package com.liferay.portal.kernel.messaging;
21
22 import com.liferay.portal.kernel.log.Log;
23 import com.liferay.portal.kernel.log.LogFactoryUtil;
24 import com.liferay.portal.kernel.util.NamedThreadFactory;
25
26 import java.util.List;
27 import java.util.concurrent.LinkedBlockingQueue;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import java.util.concurrent.TimeUnit;
30
31
37 public abstract class BaseDestination implements Destination {
38
39 public BaseDestination(String name) {
40 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
41 }
42
43 public BaseDestination(
44 String name, int workersCoreSize, int workersMaxSize) {
45
46 _name = name;
47 _workersCoreSize = workersCoreSize;
48 _workersMaxSize = workersMaxSize;
49
50 open();
51 }
52
53 public synchronized void close() {
54 close(false);
55 }
56
57 public synchronized void close(boolean force) {
58 doClose(force);
59 }
60
61 public DestinationStatistics getStatistics() {
62 DestinationStatistics statistics = new DestinationStatistics();
63
64 statistics.setActiveThreadCount(_threadPoolExecutor.getActiveCount());
65 statistics.setCurrentThreadCount(_threadPoolExecutor.getPoolSize());
66 statistics.setLargestThreadCount(
67 _threadPoolExecutor.getLargestPoolSize());
68 statistics.setMaxThreadPoolSize(
69 _threadPoolExecutor.getMaximumPoolSize());
70 statistics.setMinThreadPoolSize(_threadPoolExecutor.getCorePoolSize());
71 statistics.setPendingMessageCount(
72 _threadPoolExecutor.getQueue().size());
73 statistics.setSentMessageCount(
74 _threadPoolExecutor.getCompletedTaskCount());
75
76 return statistics;
77 }
78
79 public abstract int getListenerCount();
80
81 public String getName() {
82 return _name;
83 }
84
85 public boolean isRegistered() {
86 if (getListenerCount() > 0) {
87 return true;
88 }
89 else {
90 return false;
91 }
92 }
93
94 public synchronized void open() {
95 doOpen();
96 }
97
98 protected void doClose(boolean force) {
99 if (!_threadPoolExecutor.isShutdown() &&
100 !_threadPoolExecutor.isTerminating()) {
101
102 if (!force) {
103 _threadPoolExecutor.shutdown();
104 }
105 else {
106 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
107
108 if (_log.isInfoEnabled()) {
109 _log.info(
110 "The following " + pendingTasks.size() + " tasks " +
111 "were not executed due to shutown: " +
112 pendingTasks);
113 }
114 }
115 }
116 }
117
118 protected void doOpen() {
119 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
120 _threadPoolExecutor = new ThreadPoolExecutor(
121 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
122 new LinkedBlockingQueue<Runnable>(),
123 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
124 }
125 }
126
127 protected ThreadPoolExecutor getThreadPoolExecutor() {
128 return _threadPoolExecutor;
129 }
130
131 protected int getWorkersCoreSize() {
132 return _workersCoreSize;
133 }
134
135 protected int getWorkersMaxSize() {
136 return _workersMaxSize;
137 }
138
139 private static final int _WORKERS_CORE_SIZE = 2;
140
141 private static final int _WORKERS_MAX_SIZE = 5;
142
143 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
144
145 private String _name;
146 private ThreadPoolExecutor _threadPoolExecutor;
147 private int _workersCoreSize;
148 private int _workersMaxSize;
149
150 }