1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.util.NamedThreadFactory;
28
29 import java.util.List;
30 import java.util.concurrent.LinkedBlockingQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34
40 public abstract class BaseDestination implements Destination {
41
42 public BaseDestination(String name) {
43 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
44 }
45
46 public BaseDestination(
47 String name, int workersCoreSize, int workersMaxSize) {
48
49 _name = name;
50 _workersCoreSize = workersCoreSize;
51 _workersMaxSize = workersMaxSize;
52
53 open();
54 }
55
56 public synchronized void close() {
57 close(false);
58 }
59
60 public synchronized void close(boolean force) {
61 doClose(force);
62 }
63
64 public String getName() {
65 return _name;
66 }
67
68 public synchronized void open() {
69 doOpen();
70 }
71
72 protected void doClose(boolean force) {
73 if (!_threadPoolExecutor.isShutdown() &&
74 !_threadPoolExecutor.isTerminating()) {
75
76 if (!force) {
77 _threadPoolExecutor.shutdown();
78 }
79 else {
80 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
81
82 if (_log.isInfoEnabled()) {
83 _log.info(
84 "The following " + pendingTasks.size() + " tasks " +
85 "were not executed due to shutown: " +
86 pendingTasks);
87 }
88 }
89 }
90 }
91
92 protected void doOpen() {
93 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
94 _threadPoolExecutor = new ThreadPoolExecutor(
95 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
96 new LinkedBlockingQueue<Runnable>(),
97 new NamedThreadFactory(getName(), Thread.NORM_PRIORITY));
98 }
99 }
100
101 protected ThreadPoolExecutor getThreadPoolExecutor() {
102 return _threadPoolExecutor;
103 }
104
105 protected int getWorkersCoreSize() {
106 return _workersCoreSize;
107 }
108
109 protected int getWorkersMaxSize() {
110 return _workersMaxSize;
111 }
112
113 private static final int _WORKERS_CORE_SIZE = 5;
114
115 private static final int _WORKERS_MAX_SIZE = 10;
116
117 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
118
119 private String _name;
120 private ThreadPoolExecutor _threadPoolExecutor;
121 private int _workersCoreSize;
122 private int _workersMaxSize;
123
124 }