1   /**
2    * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
3    *
4    * The contents of this file are subject to the terms of the Liferay Enterprise
5    * Subscription License ("License"). You may not use this file except in
6    * compliance with the License. You can obtain a copy of the License by
7    * contacting Liferay, Inc. See the License for the specific language governing
8    * permissions and limitations under the License, including but not limited to
9    * distribution rights of the Software.
10   *
11   *
12   *
13   */
14  
15  package com.liferay.portal.kernel.concurrent;
16  
17  import java.util.concurrent.atomic.AtomicMarkableReference;
18  import java.util.concurrent.atomic.AtomicReference;
19  
20  /**
21   * <a href="BatchablePipe.java.html"><b><i>View Source</i></b></a>
22   *
23   * @author Shuyang Zhou
24   */
25  public class BatchablePipe<K, V> {
26  
27      public BatchablePipe() {
28          _headEntry = new Entry<K, V>(null);
29          _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
30      }
31  
32      public boolean put(IncreasableEntry<K, V> increasableEntry) {
33          Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
34  
35          while (true) {
36              if (_doIncrease(increasableEntry)) {
37                  return false;
38              }
39  
40              Entry<K, V> lastEntryLink = _lastEntryReference.get();
41              Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
42  
43              if (nextEntryLink == null) {
44                  if (lastEntryLink._nextEntry.compareAndSet(
45                          null, newEntry, false, false)) {
46  
47                      _lastEntryReference.set(newEntry);
48  
49                      return true;
50                  }
51              }
52              else {
53                  _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
54              }
55          }
56      }
57  
58      public IncreasableEntry<K, V> take() {
59          boolean[] marked = {false};
60  
61          Retry:
62  
63          while (true) {
64              Entry<K, V> predecessorEntry = _headEntry;
65              Entry<K, V> currentEntry =
66                  predecessorEntry._nextEntry.getReference();
67  
68              while (currentEntry != null) {
69                  Entry<K, V> successorEntry = currentEntry._nextEntry.get(
70                      marked);
71  
72                  if (marked[0]) {
73                      if (!predecessorEntry._nextEntry.compareAndSet(
74                              currentEntry, successorEntry, false, false)) {
75  
76                          continue Retry;
77                      }
78  
79                      currentEntry = predecessorEntry._nextEntry.getReference();
80  
81                      continue;
82                  }
83  
84                  if (currentEntry._nextEntry.compareAndSet(
85                          successorEntry, successorEntry, false, true)) {
86  
87                      return currentEntry._increasableEntry;
88                  }
89                  else {
90                      continue Retry;
91                  }
92              }
93  
94              return null;
95          }
96      }
97  
98      private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
99          boolean[] marked = {false};
100 
101         Retry:
102 
103         while (true) {
104             Entry<K, V> predecessorEntry = _headEntry;
105             Entry<K, V> currentEntry =
106                 predecessorEntry._nextEntry.getReference();
107 
108             while (currentEntry != null) {
109                 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
110                     marked);
111 
112                 if (marked[0]) {
113                     if (!predecessorEntry._nextEntry.compareAndSet(
114                             currentEntry, successorEntry, false, false)) {
115 
116                         continue Retry;
117                     }
118 
119                     currentEntry = predecessorEntry._nextEntry.getReference();
120 
121                     continue;
122                 }
123 
124                 if (currentEntry._increasableEntry.getKey().equals(
125                         increasableEntry.getKey())) {
126 
127                     return currentEntry._increasableEntry.increase(
128                         increasableEntry.getValue());
129                 }
130 
131                 predecessorEntry = currentEntry;
132                 currentEntry = successorEntry;
133             }
134 
135             _lastEntryReference.set(predecessorEntry);
136 
137             return false;
138         }
139     }
140 
141     private final Entry<K, V> _headEntry;
142     private final AtomicReference<Entry<K, V>> _lastEntryReference;
143 
144     private static class Entry<K, V> {
145 
146         private Entry(IncreasableEntry<K, V> increasableEntry) {
147             _increasableEntry = increasableEntry;
148             _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
149         }
150 
151         private IncreasableEntry<K, V> _increasableEntry;
152         private AtomicMarkableReference<Entry<K, V>> _nextEntry;
153 
154     }
155 
156 }