119
|
1 using System.Threading;
|
|
2 using System.Collections.Generic;
|
|
3 using System;
|
|
4 using System.Collections;
|
|
5
|
|
6 namespace Implab.Parallels {
|
|
7 public class AsyncQueue<T> : IEnumerable<T> {
|
|
8 class Chunk {
|
|
9 public Chunk next;
|
|
10
|
|
11 int m_low;
|
|
12 int m_hi;
|
|
13 int m_alloc;
|
|
14 readonly int m_size;
|
|
15 readonly T[] m_data;
|
|
16
|
|
17 public Chunk(int size) {
|
|
18 m_size = size;
|
|
19 m_data = new T[size];
|
|
20 }
|
|
21
|
|
22 public Chunk(int size, T value) {
|
|
23 m_size = size;
|
|
24 m_hi = 1;
|
|
25 m_alloc = 1;
|
|
26 m_data = new T[size];
|
|
27 m_data[0] = value;
|
|
28 }
|
|
29
|
|
30 public int Low {
|
|
31 get { return m_low; }
|
|
32 }
|
|
33
|
|
34 public int Hi {
|
|
35 get { return m_hi; }
|
|
36 }
|
|
37
|
|
38 public bool TryEnqueue(T value,out bool extend) {
|
120
|
39 var alloc = Interlocked.Increment(ref m_alloc) - 1;
|
119
|
40
|
120
|
41 if (alloc >= m_size) {
|
|
42 extend = alloc == m_size;
|
119
|
43 return false;
|
|
44 }
|
120
|
45
|
|
46 extend = false;
|
119
|
47 m_data[alloc] = value;
|
|
48
|
|
49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
|
|
50 // spin wait for commit
|
|
51 }
|
|
52 return true;
|
|
53 }
|
|
54
|
|
55 public bool TryDequeue(out T value,out bool recycle) {
|
|
56 int low;
|
|
57 do {
|
|
58 low = m_low;
|
|
59 if (low >= m_hi) {
|
|
60 value = default(T);
|
|
61 recycle = (low == m_size);
|
|
62 return false;
|
|
63 }
|
|
64 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
|
|
65
|
|
66 recycle = (low == m_size - 1);
|
|
67 value = m_data[low];
|
|
68
|
|
69 return true;
|
|
70 }
|
|
71
|
120
|
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
|
|
73 int alloc;
|
|
74 int allocSize;
|
|
75
|
|
76 do {
|
|
77 alloc = m_alloc;
|
|
78
|
|
79 if (alloc > m_size) {
|
|
80 enqueued = 0;
|
|
81 extend = false;
|
|
82 return false;
|
|
83 }
|
|
84
|
|
85 allocSize = Math.Min(m_size - m_alloc, length);
|
|
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
|
|
87
|
|
88 if (alloc == m_size) {
|
|
89 enqueued = 0;
|
|
90 extend = true;
|
|
91 return false;
|
|
92 }
|
|
93
|
|
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
|
|
95 enqueued = allocSize;
|
|
96 extend = false;
|
|
97
|
|
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
|
|
99 // spin wait for commit
|
|
100 }
|
|
101 return true;
|
|
102 }
|
|
103
|
119
|
104 public T GetAt(int pos) {
|
|
105 return m_data[pos];
|
|
106 }
|
|
107 }
|
|
108
|
|
109 public const int DEFAULT_CHUNK_SIZE = 32;
|
|
110
|
|
111 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
|
|
112
|
|
113 Chunk m_first;
|
|
114 Chunk m_last;
|
|
115
|
|
116 public AsyncQueue() {
|
|
117 m_last = m_first = new Chunk(m_chunkSize);
|
|
118 }
|
|
119
|
|
120 public void Enqueue(T value) {
|
|
121 var last = m_last;
|
|
122 // spin wait to the new chunk
|
|
123 bool extend = true;
|
|
124 while(last == null || !last.TryEnqueue(value, out extend)) {
|
|
125 // try to extend queue
|
|
126 if (extend || last == null) {
|
|
127 var chunk = new Chunk(m_chunkSize, value);
|
|
128 if (EnqueueChunk(last, chunk))
|
|
129 break;
|
|
130 last = m_last;
|
|
131 } else {
|
|
132 while (last != m_last) {
|
|
133 Thread.MemoryBarrier();
|
|
134 last = m_last;
|
|
135 }
|
|
136 }
|
|
137 }
|
|
138 }
|
|
139
|
|
140 public bool TryDequeue(out T value) {
|
|
141 var chunk = m_first;
|
|
142 bool recycle;
|
|
143 while (chunk != null) {
|
|
144
|
|
145 var result = chunk.TryDequeue(out value, out recycle);
|
|
146
|
|
147 if (recycle) // this chunk is waste
|
|
148 RecycleFirstChunk(chunk);
|
|
149 else
|
|
150 return result; // this chunk is usable and returned actual result
|
|
151
|
|
152 if (result) // this chunk is waste but the true result is always actual
|
|
153 return true;
|
|
154
|
|
155 // try again
|
|
156 chunk = m_first;
|
|
157 }
|
|
158
|
|
159 // the queue is empty
|
|
160 value = default(T);
|
|
161 return false;
|
|
162 }
|
|
163
|
|
164 bool EnqueueChunk(Chunk last, Chunk chunk) {
|
|
165 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
|
|
166 return false;
|
|
167
|
|
168 if (last != null)
|
|
169 last.next = chunk;
|
|
170 else
|
|
171 m_first = chunk;
|
|
172 return true;
|
|
173 }
|
|
174
|
|
175 void RecycleFirstChunk(Chunk first) {
|
|
176 var next = first.next;
|
|
177
|
|
178 if (next == null) {
|
|
179 // looks like this is the last chunk
|
|
180 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
|
|
181 // race
|
|
182 // maybe someone already recycled this chunk
|
|
183 // or a new chunk has been appedned to the queue
|
|
184
|
|
185 return; // give up
|
|
186 }
|
|
187 // the tail is updated
|
|
188 }
|
|
189
|
|
190 // we need to update the head
|
|
191 Interlocked.CompareExchange(ref m_first, next, first);
|
|
192 // if the head is already updated then give up
|
|
193 return;
|
|
194
|
|
195 }
|
|
196
|
|
197 #region IEnumerable implementation
|
|
198
|
|
199 class Enumerator : IEnumerator<T> {
|
|
200 Chunk m_current;
|
|
201 int m_pos = -1;
|
|
202
|
|
203 public Enumerator(Chunk fisrt) {
|
|
204 m_current = fisrt;
|
|
205 }
|
|
206
|
|
207 #region IEnumerator implementation
|
|
208
|
|
209 public bool MoveNext() {
|
|
210 if (m_current == null)
|
|
211 return false;
|
|
212
|
|
213 if (m_pos == -1)
|
|
214 m_pos = m_current.Low;
|
|
215 else
|
|
216 m_pos++;
|
|
217 if (m_pos == m_current.Hi) {
|
|
218 m_pos = 0;
|
|
219 m_current = m_current.next;
|
|
220 }
|
|
221
|
|
222 return true;
|
|
223 }
|
|
224
|
|
225 public void Reset() {
|
|
226 throw new NotSupportedException();
|
|
227 }
|
|
228
|
|
229 object IEnumerator.Current {
|
|
230 get {
|
|
231 return Current;
|
|
232 }
|
|
233 }
|
|
234
|
|
235 #endregion
|
|
236
|
|
237 #region IDisposable implementation
|
|
238
|
|
239 public void Dispose() {
|
|
240 }
|
|
241
|
|
242 #endregion
|
|
243
|
|
244 #region IEnumerator implementation
|
|
245
|
|
246 public T Current {
|
|
247 get {
|
|
248 if (m_pos == -1 || m_current == null)
|
|
249 throw new InvalidOperationException();
|
|
250 return m_current.GetAt(m_pos);
|
|
251 }
|
|
252 }
|
|
253
|
|
254 #endregion
|
|
255 }
|
|
256
|
|
257 public IEnumerator<T> GetEnumerator() {
|
|
258 return new Enumerator(m_first);
|
|
259 }
|
|
260
|
|
261 #endregion
|
|
262
|
|
263 #region IEnumerable implementation
|
|
264
|
|
265 IEnumerator IEnumerable.GetEnumerator() {
|
|
266 return GetEnumerator();
|
|
267 }
|
|
268
|
|
269 #endregion
|
|
270 }
|
|
271 }
|