1
14
15 package com.liferay.portal.kernel.concurrent;
16
17 import java.util.concurrent.atomic.AtomicMarkableReference;
18 import java.util.concurrent.atomic.AtomicReference;
19
20
25 public class BatchablePipe<K, V> {
26
27 public BatchablePipe() {
28 _headEntry = new Entry<K, V>(null);
29 _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
30 }
31
32 public boolean put(IncreasableEntry<K, V> increasableEntry) {
33 Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
34
35 while (true) {
36 if (_doIncrease(increasableEntry)) {
37 return false;
38 }
39
40 Entry<K, V> lastEntryLink = _lastEntryReference.get();
41 Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
42
43 if (nextEntryLink == null) {
44 if (lastEntryLink._nextEntry.compareAndSet(
45 null, newEntry, false, false)) {
46
47 _lastEntryReference.set(newEntry);
48
49 return true;
50 }
51 }
52 else {
53 _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
54 }
55 }
56 }
57
58 public IncreasableEntry<K, V> take() {
59 boolean[] marked = {false};
60
61 Retry:
62
63 while (true) {
64 Entry<K, V> predecessorEntry = _headEntry;
65 Entry<K, V> currentEntry =
66 predecessorEntry._nextEntry.getReference();
67
68 while (currentEntry != null) {
69 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
70 marked);
71
72 if (marked[0]) {
73 if (!predecessorEntry._nextEntry.compareAndSet(
74 currentEntry, successorEntry, false, false)) {
75
76 continue Retry;
77 }
78
79 currentEntry = predecessorEntry._nextEntry.getReference();
80
81 continue;
82 }
83
84 if (currentEntry._nextEntry.compareAndSet(
85 successorEntry, successorEntry, false, true)) {
86
87 return currentEntry._increasableEntry;
88 }
89 else {
90 continue Retry;
91 }
92 }
93
94 return null;
95 }
96 }
97
98 private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
99 boolean[] marked = {false};
100
101 Retry:
102
103 while (true) {
104 Entry<K, V> predecessorEntry = _headEntry;
105 Entry<K, V> currentEntry =
106 predecessorEntry._nextEntry.getReference();
107
108 while (currentEntry != null) {
109 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
110 marked);
111
112 if (marked[0]) {
113 if (!predecessorEntry._nextEntry.compareAndSet(
114 currentEntry, successorEntry, false, false)) {
115
116 continue Retry;
117 }
118
119 currentEntry = predecessorEntry._nextEntry.getReference();
120
121 continue;
122 }
123
124 if (currentEntry._increasableEntry.getKey().equals(
125 increasableEntry.getKey())) {
126
127 return currentEntry._increasableEntry.increase(
128 increasableEntry.getValue());
129 }
130
131 predecessorEntry = currentEntry;
132 currentEntry = successorEntry;
133 }
134
135 _lastEntryReference.set(predecessorEntry);
136
137 return false;
138 }
139 }
140
141 private final Entry<K, V> _headEntry;
142 private final AtomicReference<Entry<K, V>> _lastEntryReference;
143
144 private static class Entry<K, V> {
145
146 private Entry(IncreasableEntry<K, V> increasableEntry) {
147 _increasableEntry = increasableEntry;
148 _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
149 }
150
151 private IncreasableEntry<K, V> _increasableEntry;
152 private AtomicMarkableReference<Entry<K, V>> _nextEntry;
153
154 }
155
156 }