119
|
1 using System.Threading;
|
|
2 using System.Collections.Generic;
|
|
3 using System;
|
|
4 using System.Collections;
|
|
5
|
|
6 namespace Implab.Parallels {
|
|
7 public class AsyncQueue<T> : IEnumerable<T> {
|
|
8 class Chunk {
|
|
9 public Chunk next;
|
|
10
|
|
11 int m_low;
|
|
12 int m_hi;
|
|
13 int m_alloc;
|
|
14 readonly int m_size;
|
|
15 readonly T[] m_data;
|
|
16
|
|
17 public Chunk(int size) {
|
|
18 m_size = size;
|
|
19 m_data = new T[size];
|
|
20 }
|
|
21
|
|
22 public Chunk(int size, T value) {
|
|
23 m_size = size;
|
|
24 m_hi = 1;
|
|
25 m_alloc = 1;
|
|
26 m_data = new T[size];
|
|
27 m_data[0] = value;
|
|
28 }
|
|
29
|
121
|
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
|
|
31 m_size = size;
|
|
32 m_hi = length;
|
|
33 m_alloc = alloc;
|
|
34 m_data = new T[size];
|
|
35 Array.Copy(data, offset, m_data, 0, length);
|
|
36 }
|
|
37
|
119
|
38 public int Low {
|
|
39 get { return m_low; }
|
|
40 }
|
|
41
|
|
42 public int Hi {
|
|
43 get { return m_hi; }
|
|
44 }
|
|
45
|
121
|
46 public bool TryEnqueue(T value, out bool extend) {
|
120
|
47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
|
119
|
48
|
120
|
49 if (alloc >= m_size) {
|
|
50 extend = alloc == m_size;
|
119
|
51 return false;
|
|
52 }
|
120
|
53
|
|
54 extend = false;
|
119
|
55 m_data[alloc] = value;
|
|
56
|
|
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
|
|
58 // spin wait for commit
|
|
59 }
|
|
60 return true;
|
|
61 }
|
|
62
|
121
|
63 public bool TryDequeue(out T value, out bool recycle) {
|
119
|
64 int low;
|
|
65 do {
|
|
66 low = m_low;
|
|
67 if (low >= m_hi) {
|
|
68 value = default(T);
|
|
69 recycle = (low == m_size);
|
|
70 return false;
|
|
71 }
|
|
72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
|
|
73
|
|
74 recycle = (low == m_size - 1);
|
|
75 value = m_data[low];
|
|
76
|
|
77 return true;
|
|
78 }
|
|
79
|
120
|
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
|
|
81 int alloc;
|
|
82 int allocSize;
|
|
83
|
121
|
84 // in case the batch size is larger than a free space in chunk
|
|
85 // tailGap is used to over allocate the space in the chunk to
|
|
86 // get exclusive permission on creation of the next one.
|
|
87 int tailGap = 0;
|
|
88
|
120
|
89 do {
|
|
90 alloc = m_alloc;
|
|
91
|
|
92 if (alloc > m_size) {
|
121
|
93 // the chunk is full and someone already
|
|
94 // creating the new one
|
|
95 enqueued = 0; // nothing was added
|
|
96 extend = false; // the caller shouldn't try to extend the queue
|
|
97 return false; // nothing was added
|
120
|
98 }
|
|
99
|
121
|
100 allocSize = Math.Min(m_size - alloc, length);
|
|
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
|
|
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
|
|
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
|
|
104
|
|
105 extend = tailGap != 0;
|
|
106 enqueued = allocSize;
|
120
|
107
|
121
|
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
|
|
109 if (alloc == m_size)
|
120
|
110 return false;
|
|
111
|
|
112 Array.Copy(batch, offset, m_data, alloc, allocSize);
|
|
113
|
|
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
|
|
115 // spin wait for commit
|
|
116 }
|
|
117 return true;
|
|
118 }
|
|
119
|
121
|
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
|
|
121 int low, hi, batchSize;
|
|
122
|
|
123 do {
|
|
124 low = m_low;
|
|
125 hi = m_hi;
|
|
126 if (low >= hi) {
|
|
127 dequeued = 0;
|
|
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
|
|
129 return false;
|
|
130 }
|
|
131 batchSize = Math.Min(hi - low, length);
|
|
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
|
|
133
|
|
134 recycle = (low == m_size - batchSize);
|
|
135 dequeued = batchSize;
|
|
136
|
|
137 Array.Copy(m_data, low, buffer, offset, batchSize);
|
|
138
|
|
139 return true;
|
|
140 }
|
|
141
|
119
|
142 public T GetAt(int pos) {
|
|
143 return m_data[pos];
|
|
144 }
|
|
145 }
|
|
146
|
|
147 public const int DEFAULT_CHUNK_SIZE = 32;
|
121
|
148 public const int MAX_CHUNK_SIZE = 262144;
|
119
|
149
|
|
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
|
|
151
|
|
152 Chunk m_first;
|
|
153 Chunk m_last;
|
|
154
|
|
155 public AsyncQueue() {
|
|
156 m_last = m_first = new Chunk(m_chunkSize);
|
|
157 }
|
|
158
|
121
|
159 /// <summary>
|
|
160 /// Adds the specified value to the queue.
|
|
161 /// </summary>
|
|
162 /// <param name="value">Tha value which will be added to the queue.</param>
|
119
|
163 public void Enqueue(T value) {
|
|
164 var last = m_last;
|
|
165 // spin wait to the new chunk
|
|
166 bool extend = true;
|
121
|
167 while (last == null || !last.TryEnqueue(value, out extend)) {
|
119
|
168 // try to extend queue
|
|
169 if (extend || last == null) {
|
|
170 var chunk = new Chunk(m_chunkSize, value);
|
|
171 if (EnqueueChunk(last, chunk))
|
|
172 break;
|
|
173 last = m_last;
|
|
174 } else {
|
121
|
175 while (last == m_last) {
|
119
|
176 Thread.MemoryBarrier();
|
|
177 }
|
121
|
178 last = m_last;
|
119
|
179 }
|
|
180 }
|
|
181 }
|
|
182
|
121
|
183 /// <summary>
|
|
184 /// Adds the specified data to the queue.
|
|
185 /// </summary>
|
|
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
|
|
187 /// <param name="offset">The offset of the data in the buffer.</param>
|
|
188 /// <param name="length">The size of the data to read from the buffer.</param>
|
|
189 public void EnqueueRange(T[] data, int offset, int length) {
|
|
190 if (data == null)
|
|
191 throw new ArgumentNullException("data");
|
|
192 if (offset < 0)
|
|
193 throw new ArgumentOutOfRangeException("offset");
|
|
194 if (length < 1 || offset + length > data.Length)
|
|
195 throw new ArgumentOutOfRangeException("length");
|
|
196
|
|
197 var last = m_last;
|
|
198
|
|
199 bool extend;
|
|
200 int enqueued;
|
|
201
|
|
202 while (length > 0) {
|
|
203 extend = true;
|
|
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
|
|
205 length -= enqueued;
|
|
206 offset += enqueued;
|
|
207 }
|
|
208
|
|
209 if (extend) {
|
|
210 // there was no enough space in the chunk
|
|
211 // or there was no chunks in the queue
|
|
212
|
|
213 while (length > 0) {
|
|
214
|
|
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
|
|
216
|
|
217 var chunk = new Chunk(
|
|
218 Math.Max(size, m_chunkSize),
|
|
219 data,
|
|
220 offset,
|
|
221 size,
|
|
222 length // length >= size
|
|
223 );
|
|
224
|
|
225 if (!EnqueueChunk(last, chunk)) {
|
|
226 // looks like the queue has been updated then proceed from the beginning
|
|
227 last = m_last;
|
|
228 break;
|
|
229 }
|
|
230
|
|
231 // we have successfully added the new chunk
|
|
232 last = chunk;
|
|
233 length -= size;
|
|
234 offset += size;
|
|
235 }
|
|
236 } else {
|
|
237 // we don't need to extend the queue, if we successfully enqueued data
|
|
238 if (length == 0)
|
|
239 break;
|
|
240
|
|
241 // if we need to wait while someone is extending the queue
|
|
242 // spinwait
|
|
243 while (last == m_last) {
|
|
244 Thread.MemoryBarrier();
|
|
245 }
|
|
246
|
|
247 last = m_last;
|
|
248 }
|
|
249 }
|
|
250 }
|
|
251
|
|
252 /// <summary>
|
|
253 /// Tries to retrieve the first element from the queue.
|
|
254 /// </summary>
|
|
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
|
|
256 /// <param name="value">The value of the dequeued element.</param>
|
119
|
257 public bool TryDequeue(out T value) {
|
|
258 var chunk = m_first;
|
|
259 bool recycle;
|
|
260 while (chunk != null) {
|
|
261
|
|
262 var result = chunk.TryDequeue(out value, out recycle);
|
|
263
|
|
264 if (recycle) // this chunk is waste
|
|
265 RecycleFirstChunk(chunk);
|
|
266 else
|
|
267 return result; // this chunk is usable and returned actual result
|
|
268
|
|
269 if (result) // this chunk is waste but the true result is always actual
|
|
270 return true;
|
|
271
|
|
272 // try again
|
|
273 chunk = m_first;
|
|
274 }
|
|
275
|
|
276 // the queue is empty
|
|
277 value = default(T);
|
|
278 return false;
|
|
279 }
|
|
280
|
121
|
281 /// <summary>
|
|
282 /// Tries to dequeue the specified amount of data from the queue.
|
|
283 /// </summary>
|
|
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
|
|
285 /// <param name="buffer">The buffer to which the data will be written.</param>
|
|
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
|
|
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
|
|
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
|
|
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
|
|
290 if (buffer == null)
|
|
291 throw new ArgumentNullException("buffer");
|
|
292 if (offset < 0)
|
|
293 throw new ArgumentOutOfRangeException("offset");
|
|
294 if (length < 1 || offset + length > buffer.Length)
|
|
295 throw new ArgumentOutOfRangeException("length");
|
|
296
|
|
297 var chunk = m_first;
|
|
298 bool recycle;
|
|
299 dequeued = 0;
|
|
300 while (chunk != null) {
|
|
301
|
|
302 int actual;
|
|
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
|
|
304 offset += actual;
|
|
305 length -= actual;
|
|
306 dequeued += actual;
|
|
307 }
|
|
308
|
|
309 if (recycle) // this chunk is waste
|
|
310 RecycleFirstChunk(chunk);
|
|
311 else if (actual == 0)
|
|
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
|
|
313
|
|
314 if (length == 0)
|
|
315 return true;
|
|
316
|
|
317 // we still may dequeue something
|
|
318 // try again
|
|
319 chunk = m_first;
|
|
320 }
|
|
321
|
|
322 return dequeued != 0;
|
|
323 }
|
|
324
|
|
325 /// <summary>
|
|
326 /// Tries to dequeue all remaining data in the first chunk.
|
|
327 /// </summary>
|
|
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
|
|
329 /// <param name="buffer">The buffer to which data will be written.</param>
|
|
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
|
|
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
|
|
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
|
|
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
|
|
334 if (buffer == null)
|
|
335 throw new ArgumentNullException("buffer");
|
|
336 if (offset < 0)
|
|
337 throw new ArgumentOutOfRangeException("offset");
|
|
338 if (length < 1 || offset + length > buffer.Length)
|
|
339 throw new ArgumentOutOfRangeException("length");
|
|
340
|
|
341 var chunk = m_first;
|
|
342 bool recycle;
|
|
343 dequeued = 0;
|
|
344
|
|
345 while (chunk != null) {
|
|
346
|
|
347 int actual;
|
|
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
|
|
349 dequeued = actual;
|
|
350 }
|
|
351
|
|
352 if (recycle) // this chunk is waste
|
|
353 RecycleFirstChunk(chunk);
|
|
354
|
|
355 // if we have dequeued any data, then return
|
|
356 if (dequeued != 0)
|
|
357 return true;
|
|
358
|
|
359 // we still may dequeue something
|
|
360 // try again
|
|
361 chunk = m_first;
|
|
362 }
|
|
363
|
|
364 return false;
|
|
365 }
|
|
366
|
119
|
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
|
|
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
|
|
369 return false;
|
|
370
|
|
371 if (last != null)
|
|
372 last.next = chunk;
|
|
373 else
|
|
374 m_first = chunk;
|
|
375 return true;
|
|
376 }
|
|
377
|
|
378 void RecycleFirstChunk(Chunk first) {
|
|
379 var next = first.next;
|
|
380
|
|
381 if (next == null) {
|
|
382 // looks like this is the last chunk
|
|
383 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
|
|
384 // race
|
|
385 // maybe someone already recycled this chunk
|
|
386 // or a new chunk has been appedned to the queue
|
|
387
|
|
388 return; // give up
|
|
389 }
|
|
390 // the tail is updated
|
|
391 }
|
|
392
|
|
393 // we need to update the head
|
|
394 Interlocked.CompareExchange(ref m_first, next, first);
|
|
395 // if the head is already updated then give up
|
|
396 return;
|
|
397
|
|
398 }
|
|
399
|
|
400 #region IEnumerable implementation
|
|
401
|
|
402 class Enumerator : IEnumerator<T> {
|
|
403 Chunk m_current;
|
|
404 int m_pos = -1;
|
|
405
|
|
406 public Enumerator(Chunk fisrt) {
|
|
407 m_current = fisrt;
|
|
408 }
|
|
409
|
|
410 #region IEnumerator implementation
|
|
411
|
|
412 public bool MoveNext() {
|
|
413 if (m_current == null)
|
|
414 return false;
|
|
415
|
|
416 if (m_pos == -1)
|
|
417 m_pos = m_current.Low;
|
|
418 else
|
|
419 m_pos++;
|
|
420 if (m_pos == m_current.Hi) {
|
|
421 m_pos = 0;
|
|
422 m_current = m_current.next;
|
|
423 }
|
|
424
|
|
425 return true;
|
|
426 }
|
|
427
|
|
428 public void Reset() {
|
|
429 throw new NotSupportedException();
|
|
430 }
|
|
431
|
|
432 object IEnumerator.Current {
|
|
433 get {
|
|
434 return Current;
|
|
435 }
|
|
436 }
|
|
437
|
|
438 #endregion
|
|
439
|
|
440 #region IDisposable implementation
|
|
441
|
|
442 public void Dispose() {
|
|
443 }
|
|
444
|
|
445 #endregion
|
|
446
|
|
447 #region IEnumerator implementation
|
|
448
|
|
449 public T Current {
|
|
450 get {
|
|
451 if (m_pos == -1 || m_current == null)
|
|
452 throw new InvalidOperationException();
|
|
453 return m_current.GetAt(m_pos);
|
|
454 }
|
|
455 }
|
|
456
|
|
457 #endregion
|
|
458 }
|
|
459
|
|
460 public IEnumerator<T> GetEnumerator() {
|
|
461 return new Enumerator(m_first);
|
|
462 }
|
|
463
|
|
464 #endregion
|
|
465
|
|
466 #region IEnumerable implementation
|
|
467
|
|
468 IEnumerator IEnumerable.GetEnumerator() {
|
|
469 return GetEnumerator();
|
|
470 }
|
|
471
|
|
472 #endregion
|
|
473 }
|
|
474 }
|