1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.kernel.concurrent;
16  
17  import java.util.Comparator;
18  import java.util.concurrent.atomic.AtomicInteger;
19  import java.util.concurrent.atomic.AtomicLong;
20  import java.util.concurrent.locks.Condition;
21  import java.util.concurrent.locks.ReentrantLock;
22  
23  /**
24   * <a href="CoalescedPipe.java.html"><b><i>View Source</i></b></a>
25   *
26   * @author Shuyang Zhou
27   */
28  public class CoalescedPipe<E> {
29  
30      public CoalescedPipe() {
31          this(null);
32      }
33  
34      public CoalescedPipe(Comparator<E> comparator) {
35          _comparator = comparator;
36          _notEmptyCondition = _takeLock.newCondition();
37  
38          _headElementLink = new ElementLink<E>(null);
39          _lastElementLink = _headElementLink;
40      }
41  
42      public long coalescedCount() {
43          return _coalescedCount.get();
44      }
45  
46      public int pendingCount() {
47          return _pendingCount.get();
48      }
49  
50      public void put(E e) throws InterruptedException {
51          if (e == null) {
52              throw new NullPointerException();
53          }
54  
55          int pendingElements = -1;
56  
57          _putLock.lockInterruptibly();
58  
59          try {
60              if (_coalesceElement(e)) {
61                  return;
62              }
63  
64              _lastElementLink._nextElementLink = new ElementLink<E>(e);
65  
66              _lastElementLink = _lastElementLink._nextElementLink;
67  
68              pendingElements = _pendingCount.getAndIncrement();
69          }
70          finally {
71              _putLock.unlock();
72          }
73  
74          if (pendingElements == 0) {
75              _takeLock.lock();
76  
77              try {
78                  _notEmptyCondition.signal();
79              }
80              finally {
81                  _takeLock.unlock();
82              }
83          }
84      }
85  
86      public E take() throws InterruptedException {
87          E element = null;
88  
89          _takeLock.lockInterruptibly();
90  
91          try {
92              while (_pendingCount.get() == 0) {
93                  _notEmptyCondition.await();
94              }
95  
96              ElementLink<E> garbageELementLink = _headElementLink;
97  
98              _headElementLink = _headElementLink._nextElementLink;
99  
100             garbageELementLink._nextElementLink = null;
101 
102             element = _headElementLink._element;
103 
104             _headElementLink._element = null;
105 
106             int pendingElements = _pendingCount.getAndDecrement();
107 
108             if (pendingElements > 1) {
109                 _notEmptyCondition.signal();
110             }
111         }
112         finally {
113             _takeLock.unlock();
114         }
115 
116         return element;
117     }
118 
119     public Object[] takeSnapshot() {
120         _putLock.lock();
121         _takeLock.lock();
122 
123         try {
124             Object[] pendingElements = new Object[_pendingCount.get()];
125 
126             ElementLink<E> currentElementLink =
127                 _headElementLink._nextElementLink;
128 
129             for (int i = 0; currentElementLink != null; i++) {
130                 pendingElements[i] = currentElementLink._element;
131 
132                 currentElementLink = currentElementLink._nextElementLink;
133             }
134 
135             return pendingElements;
136         }
137         finally {
138             _putLock.unlock();
139             _takeLock.unlock();
140         }
141     }
142 
143     private boolean _coalesceElement(E e) {
144         try {
145             _takeLock.lockInterruptibly();
146 
147             try {
148                 ElementLink<E> currentElementLink =
149                     _headElementLink._nextElementLink;
150 
151                 if (_comparator != null) {
152                     while (currentElementLink != null) {
153                         if (_comparator.compare(
154                                 currentElementLink._element, e) == 0) {
155 
156                             _coalescedCount.incrementAndGet();
157 
158                             return true;
159                         }
160                         else {
161                             currentElementLink =
162                                 currentElementLink._nextElementLink;
163                         }
164                     }
165                 }
166                 else {
167                     while (currentElementLink != null) {
168                         if (currentElementLink._element.equals(e)) {
169                             _coalescedCount.incrementAndGet();
170 
171                             return true;
172                         }
173                         else {
174                             currentElementLink =
175                                 currentElementLink._nextElementLink;
176                         }
177                     }
178                 }
179             }
180             finally {
181                 _takeLock.unlock();
182             }
183         }
184         catch (InterruptedException ie) {
185 
186             // Continue to let the current element enter the pipe
187 
188         }
189 
190         return false;
191     }
192 
193     private final AtomicLong _coalescedCount = new AtomicLong(0);
194     private final Comparator<E> _comparator;
195     private ElementLink<E> _headElementLink;
196     private ElementLink<E> _lastElementLink;
197     private final Condition _notEmptyCondition;
198     private final AtomicInteger _pendingCount = new AtomicInteger(0);
199     private final ReentrantLock _putLock = new ReentrantLock();
200     private final ReentrantLock _takeLock = new ReentrantLock();
201 
202     private static class ElementLink<E> {
203 
204         private ElementLink(E element) {
205             _element = element;
206         }
207 
208         private E _element;
209         private ElementLink<E> _nextElementLink;
210 
211     }
212 
213 }