1
14
15 package com.liferay.portal.kernel.concurrent;
16
17 import java.util.Comparator;
18 import java.util.concurrent.atomic.AtomicInteger;
19 import java.util.concurrent.atomic.AtomicLong;
20 import java.util.concurrent.locks.Condition;
21 import java.util.concurrent.locks.ReentrantLock;
22
23
28 public class CoalescedPipe<E> {
29
30 public CoalescedPipe() {
31 this(null);
32 }
33
34 public CoalescedPipe(Comparator<E> comparator) {
35 _comparator = comparator;
36 _notEmptyCondition = _takeLock.newCondition();
37
38 _headElementLink = new ElementLink<E>(null);
39 _lastElementLink = _headElementLink;
40 }
41
42 public long coalescedCount() {
43 return _coalescedCount.get();
44 }
45
46 public int pendingCount() {
47 return _pendingCount.get();
48 }
49
50 public void put(E e) throws InterruptedException {
51 if (e == null) {
52 throw new NullPointerException();
53 }
54
55 int pendingElements = -1;
56
57 _putLock.lockInterruptibly();
58
59 try {
60 if (_coalesceElement(e)) {
61 return;
62 }
63
64 _lastElementLink._nextElementLink = new ElementLink<E>(e);
65
66 _lastElementLink = _lastElementLink._nextElementLink;
67
68 pendingElements = _pendingCount.getAndIncrement();
69 }
70 finally {
71 _putLock.unlock();
72 }
73
74 if (pendingElements == 0) {
75 _takeLock.lock();
76
77 try {
78 _notEmptyCondition.signal();
79 }
80 finally {
81 _takeLock.unlock();
82 }
83 }
84 }
85
86 public E take() throws InterruptedException {
87 E element = null;
88
89 _takeLock.lockInterruptibly();
90
91 try {
92 while (_pendingCount.get() == 0) {
93 _notEmptyCondition.await();
94 }
95
96 ElementLink<E> garbageELementLink = _headElementLink;
97
98 _headElementLink = _headElementLink._nextElementLink;
99
100 garbageELementLink._nextElementLink = null;
101
102 element = _headElementLink._element;
103
104 _headElementLink._element = null;
105
106 int pendingElements = _pendingCount.getAndDecrement();
107
108 if (pendingElements > 1) {
109 _notEmptyCondition.signal();
110 }
111 }
112 finally {
113 _takeLock.unlock();
114 }
115
116 return element;
117 }
118
119 public Object[] takeSnapshot() {
120 _putLock.lock();
121 _takeLock.lock();
122
123 try {
124 Object[] pendingElements = new Object[_pendingCount.get()];
125
126 ElementLink<E> currentElementLink =
127 _headElementLink._nextElementLink;
128
129 for (int i = 0; currentElementLink != null; i++) {
130 pendingElements[i] = currentElementLink._element;
131
132 currentElementLink = currentElementLink._nextElementLink;
133 }
134
135 return pendingElements;
136 }
137 finally {
138 _putLock.unlock();
139 _takeLock.unlock();
140 }
141 }
142
143 private boolean _coalesceElement(E e) {
144 try {
145 _takeLock.lockInterruptibly();
146
147 try {
148 ElementLink<E> currentElementLink =
149 _headElementLink._nextElementLink;
150
151 if (_comparator != null) {
152 while (currentElementLink != null) {
153 if (_comparator.compare(
154 currentElementLink._element, e) == 0) {
155
156 _coalescedCount.incrementAndGet();
157
158 return true;
159 }
160 else {
161 currentElementLink =
162 currentElementLink._nextElementLink;
163 }
164 }
165 }
166 else {
167 while (currentElementLink != null) {
168 if (currentElementLink._element.equals(e)) {
169 _coalescedCount.incrementAndGet();
170
171 return true;
172 }
173 else {
174 currentElementLink =
175 currentElementLink._nextElementLink;
176 }
177 }
178 }
179 }
180 finally {
181 _takeLock.unlock();
182 }
183 }
184 catch (InterruptedException ie) {
185
186
188 }
189
190 return false;
191 }
192
193 private final AtomicLong _coalescedCount = new AtomicLong(0);
194 private final Comparator<E> _comparator;
195 private ElementLink<E> _headElementLink;
196 private ElementLink<E> _lastElementLink;
197 private final Condition _notEmptyCondition;
198 private final AtomicInteger _pendingCount = new AtomicInteger(0);
199 private final ReentrantLock _putLock = new ReentrantLock();
200 private final ReentrantLock _takeLock = new ReentrantLock();
201
202 private static class ElementLink<E> {
203
204 private ElementLink(E element) {
205 _element = element;
206 }
207
208 private E _element;
209 private ElementLink<E> _nextElementLink;
210
211 }
212
213 }