001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * The contents of this file are subject to the terms of the Liferay Enterprise
005     * Subscription License ("License"). You may not use this file except in
006     * compliance with the License. You can obtain a copy of the License by
007     * contacting Liferay, Inc. See the License for the specific language governing
008     * permissions and limitations under the License, including but not limited to
009     * distribution rights of the Software.
010     *
011     *
012     *
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.Comparator;
018    import java.util.concurrent.atomic.AtomicInteger;
019    import java.util.concurrent.atomic.AtomicLong;
020    import java.util.concurrent.locks.Condition;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    /**
024     * @author Shuyang Zhou
025     */
026    public class CoalescedPipe<E> {
027    
028            public CoalescedPipe() {
029                    this(null);
030            }
031    
032            public CoalescedPipe(Comparator<E> comparator) {
033                    _comparator = comparator;
034                    _notEmptyCondition = _takeLock.newCondition();
035    
036                    _headElementLink = new ElementLink<E>(null);
037                    _lastElementLink = _headElementLink;
038            }
039    
040            public long coalescedCount() {
041                    return _coalescedCount.get();
042            }
043    
044            public int pendingCount() {
045                    return _pendingCount.get();
046            }
047    
048            public void put(E e) throws InterruptedException {
049                    if (e == null) {
050                            throw new NullPointerException();
051                    }
052    
053                    int pendingElements = -1;
054    
055                    _putLock.lockInterruptibly();
056    
057                    try {
058                            if (_coalesceElement(e)) {
059                                    return;
060                            }
061    
062                            _lastElementLink._nextElementLink = new ElementLink<E>(e);
063    
064                            _lastElementLink = _lastElementLink._nextElementLink;
065    
066                            pendingElements = _pendingCount.getAndIncrement();
067                    }
068                    finally {
069                            _putLock.unlock();
070                    }
071    
072                    if (pendingElements == 0) {
073                            _takeLock.lock();
074    
075                            try {
076                                    _notEmptyCondition.signal();
077                            }
078                            finally {
079                                    _takeLock.unlock();
080                            }
081                    }
082            }
083    
084            public E take() throws InterruptedException {
085                    E element = null;
086    
087                    _takeLock.lockInterruptibly();
088    
089                    try {
090                            while (_pendingCount.get() == 0) {
091                                    _notEmptyCondition.await();
092                            }
093    
094                            ElementLink<E> garbageELementLink = _headElementLink;
095    
096                            _headElementLink = _headElementLink._nextElementLink;
097    
098                            garbageELementLink._nextElementLink = null;
099    
100                            element = _headElementLink._element;
101    
102                            _headElementLink._element = null;
103    
104                            int pendingElements = _pendingCount.getAndDecrement();
105    
106                            if (pendingElements > 1) {
107                                    _notEmptyCondition.signal();
108                            }
109                    }
110                    finally {
111                            _takeLock.unlock();
112                    }
113    
114                    return element;
115            }
116    
117            public Object[] takeSnapshot() {
118                    _putLock.lock();
119                    _takeLock.lock();
120    
121                    try {
122                            Object[] pendingElements = new Object[_pendingCount.get()];
123    
124                            ElementLink<E> currentElementLink =
125                                    _headElementLink._nextElementLink;
126    
127                            for (int i = 0; currentElementLink != null; i++) {
128                                    pendingElements[i] = currentElementLink._element;
129    
130                                    currentElementLink = currentElementLink._nextElementLink;
131                            }
132    
133                            return pendingElements;
134                    }
135                    finally {
136                            _putLock.unlock();
137                            _takeLock.unlock();
138                    }
139            }
140    
141            private boolean _coalesceElement(E e) {
142                    try {
143                            _takeLock.lockInterruptibly();
144    
145                            try {
146                                    ElementLink<E> currentElementLink =
147                                            _headElementLink._nextElementLink;
148    
149                                    if (_comparator != null) {
150                                            while (currentElementLink != null) {
151                                                    if (_comparator.compare(
152                                                                    currentElementLink._element, e) == 0) {
153    
154                                                            _coalescedCount.incrementAndGet();
155    
156                                                            return true;
157                                                    }
158                                                    else {
159                                                            currentElementLink =
160                                                                    currentElementLink._nextElementLink;
161                                                    }
162                                            }
163                                    }
164                                    else {
165                                            while (currentElementLink != null) {
166                                                    if (currentElementLink._element.equals(e)) {
167                                                            _coalescedCount.incrementAndGet();
168    
169                                                            return true;
170                                                    }
171                                                    else {
172                                                            currentElementLink =
173                                                                    currentElementLink._nextElementLink;
174                                                    }
175                                            }
176                                    }
177                            }
178                            finally {
179                                    _takeLock.unlock();
180                            }
181                    }
182                    catch (InterruptedException ie) {
183    
184                            // Continue to let the current element enter the pipe
185    
186                    }
187    
188                    return false;
189            }
190    
191            private final AtomicLong _coalescedCount = new AtomicLong(0);
192            private final Comparator<E> _comparator;
193            private ElementLink<E> _headElementLink;
194            private ElementLink<E> _lastElementLink;
195            private final Condition _notEmptyCondition;
196            private final AtomicInteger _pendingCount = new AtomicInteger(0);
197            private final ReentrantLock _putLock = new ReentrantLock();
198            private final ReentrantLock _takeLock = new ReentrantLock();
199    
200            private static class ElementLink<E> {
201    
202                    private ElementLink(E element) {
203                            _element = element;
204                    }
205    
206                    private E _element;
207                    private ElementLink<E> _nextElementLink;
208    
209            }
210    
211    }