1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.NamedThreadFactory;
28
29 import java.util.List;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34
40 public abstract class BaseDestination implements Destination {
41
42 public BaseDestination(String name) {
43 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
44 }
45
46 public BaseDestination(
47 String name, int workersCoreSize, int workersMaxSize) {
48
49 _name = name;
50 _workersCoreSize = workersCoreSize;
51 _workersMaxSize = workersMaxSize;
52
53 open();
54 }
55
56 public synchronized void close() {
57 close(false);
58 }
59
60 public synchronized void close(boolean force) {
61 doClose(force);
62 }
63
64 public String getName() {
65 return _name;
66 }
67
68 public boolean isRegistered() {
69 if (_listenersCount > 0) {
70 return true;
71 }
72 else {
73 return false;
74 }
75 }
76
77 public synchronized void open() {
78 doOpen();
79 }
80
81 protected void doClose(boolean force) {
82 if (!_threadPoolExecutor.isShutdown() &&
83 !_threadPoolExecutor.isTerminating()) {
84
85 if (!force) {
86 _threadPoolExecutor.shutdown();
87 }
88 else {
89 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
90
91 if (_log.isInfoEnabled()) {
92 _log.info(
93 "The following " + pendingTasks.size() + " tasks " +
94 "were not executed due to shutown: " +
95 pendingTasks);
96 }
97 }
98 }
99 }
100
101 protected void doOpen() {
102 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
103 _threadPoolExecutor = new ThreadPoolExecutor(
104 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
105 new LinkedBlockingQueue<Runnable>(),
106 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
107 }
108 }
109
110 protected ThreadPoolExecutor getThreadPoolExecutor() {
111 return _threadPoolExecutor;
112 }
113
114 protected int getWorkersCoreSize() {
115 return _workersCoreSize;
116 }
117
118 protected int getWorkersMaxSize() {
119 return _workersMaxSize;
120 }
121
122 protected void setListenersCount(int listenersCount) {
123 _listenersCount = listenersCount;
124 }
125
126 private static final int _WORKERS_CORE_SIZE = 5;
127
128 private static final int _WORKERS_MAX_SIZE = 10;
129
130 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
131
132 private String _name;
133 private ThreadPoolExecutor _threadPoolExecutor;
134 private int _workersCoreSize;
135 private int _workersMaxSize;
136 private int _listenersCount;
137
138 }