Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 124:a336cb13c6a9 v2
major update, added Drain mathod to AsyncQueue class
author | cin |
---|---|
date | Thu, 15 Jan 2015 02:43:14 +0300 |
parents | f4d6ea6969cc |
children | f803565868a4 |
comparison
equal
deleted
inserted
replaced
123:f4d6ea6969cc | 124:a336cb13c6a9 |
---|---|
1 using System.Threading; | 1 using System.Threading; |
2 using System.Collections.Generic; | 2 using System.Collections.Generic; |
3 using System; | 3 using System; |
4 using System.Collections; | 4 using System.Collections; |
5 using System.Diagnostics; | |
5 | 6 |
6 namespace Implab.Parallels { | 7 namespace Implab.Parallels { |
7 public class AsyncQueue<T> : IEnumerable<T> { | 8 public class AsyncQueue<T> : IEnumerable<T> { |
8 class Chunk { | 9 class Chunk { |
9 public Chunk next; | 10 public Chunk next; |
56 | 57 |
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | 58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { |
58 // spin wait for commit | 59 // spin wait for commit |
59 } | 60 } |
60 return true; | 61 return true; |
62 } | |
63 | |
64 /// <summary> | |
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete | |
66 /// </summary> | |
67 public void Commit() { | |
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); | |
69 | |
70 while (m_hi != actual) | |
71 Thread.MemoryBarrier(); | |
61 } | 72 } |
62 | 73 |
63 public bool TryDequeue(out T value, out bool recycle) { | 74 public bool TryDequeue(out T value, out bool recycle) { |
64 int low; | 75 int low; |
65 do { | 76 do { |
357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) |
358 return false; | 369 return false; |
359 | 370 |
360 if (last != null) | 371 if (last != null) |
361 last.next = chunk; | 372 last.next = chunk; |
362 else | 373 else { |
363 m_first = chunk; | 374 m_first = chunk; |
375 } | |
364 return true; | 376 return true; |
365 } | 377 } |
366 | 378 |
367 void RecycleFirstChunk(Chunk first) { | 379 void RecycleFirstChunk(Chunk first) { |
368 var next = first.next; | 380 var next = first.next; |
369 | 381 |
382 if (first != Interlocked.CompareExchange(ref m_first, next, first)) | |
383 return; | |
384 | |
370 if (next == null) { | 385 if (next == null) { |
371 // looks like this is the last chunk | 386 |
372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | 387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { |
388 /*while (first.next == null) | |
389 Thread.MemoryBarrier();*/ | |
390 | |
373 // race | 391 // race |
374 // maybe someone already recycled this chunk | 392 // someone already updated the tail, restore the pointer to the queue head |
375 // or a new chunk has been appedned to the queue | 393 m_first = first; |
376 | |
377 return; // give up | |
378 } | 394 } |
379 // the tail is updated | 395 // the tail is updated |
380 } | 396 } |
381 | 397 |
382 // we need to update the head | 398 // we need to update the head |
383 Interlocked.CompareExchange(ref m_first, next, first); | 399 //Interlocked.CompareExchange(ref m_first, next, first); |
384 // if the head is already updated then give up | 400 // if the head is already updated then give up |
385 return; | 401 //return; |
386 | 402 |
387 } | 403 } |
388 | 404 |
389 public void Clear() { | 405 public void Clear() { |
390 // start the new queue | 406 // start the new queue |
391 var t = new Chunk(m_chunkSize); | 407 var chunk = new Chunk(m_chunkSize); |
392 Thread.MemoryBarrier(); | 408 |
393 m_last = t; | 409 do { |
394 Thread.MemoryBarrier(); | 410 Thread.MemoryBarrier(); |
395 | 411 var first = m_first; |
396 // make the new queue available to the readers, and stop the old one | 412 var last = m_last; |
397 m_first = t; | 413 |
398 Thread.MemoryBarrier(); | 414 if (last == null) // nothing to clear |
415 return; | |
416 | |
417 if (first == null || (first.next == null && first != last)) // inconcistency | |
418 continue; | |
419 | |
420 // here we will create inconsistency which will force others to spin | |
421 // and prevent from fetching. chunk.next = null | |
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
423 continue;// inconsistent | |
424 | |
425 m_last = chunk; | |
426 | |
427 return; | |
428 | |
429 } while(true); | |
399 } | 430 } |
400 | 431 |
401 public T[] Drain() { | 432 public T[] Drain() { |
402 // start the new queue | 433 // start the new queue |
403 var t = new Chunk(m_chunkSize); | 434 var chunk = new Chunk(m_chunkSize); |
404 Thread.MemoryBarrier(); | |
405 m_last = t; | |
406 Thread.MemoryBarrier(); | |
407 | |
408 // make the new queue available to the readers, and stop the old one | |
409 Chunk first; | |
410 | 435 |
411 do { | 436 do { |
412 first = m_first; | 437 Thread.MemoryBarrier(); |
413 } while(first != Interlocked.CompareExchange(ref m_first | 438 var first = m_first; |
414 Thread.MemoryBarrier(); | 439 var last = m_last; |
415 | 440 |
416 | 441 if (last == null) |
442 return new T[0]; | |
443 | |
444 if (first == null || (first.next == null && first != last)) | |
445 continue; | |
446 | |
447 // here we will create inconsistency which will force others to spin | |
448 // and prevent from fetching. chunk.next = null | |
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
450 continue;// inconsistent | |
451 | |
452 last = Interlocked.Exchange(ref m_last, chunk); | |
453 | |
454 return ReadChunks(first, last); | |
455 | |
456 } while(true); | |
417 } | 457 } |
418 | 458 |
419 T[] ReadChunks(Chunk chunk) { | 459 T[] ReadChunks(Chunk chunk, object last) { |
420 var result = new List<T>(); | 460 var result = new List<T>(); |
421 var buffer = new T[m_chunkSize]; | 461 var buffer = new T[m_chunkSize]; |
422 int actual; | 462 int actual; |
423 bool recycle; | 463 bool recycle; |
424 while (chunk != null) { | 464 while (chunk != null) { |
465 // ensure all write operations on the chunk are complete | |
466 chunk.Commit(); | |
467 | |
425 // we need to read the chunk using this way | 468 // we need to read the chunk using this way |
426 // since some client still may completing the dequeue | 469 // since some client still may completing the dequeue |
427 // operation, such clients most likely won't get results | 470 // operation, such clients most likely won't get results |
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | 471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) |
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | 472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); |
430 | 473 |
431 chunk = chunk.next; | 474 if (chunk == last) { |
475 chunk = null; | |
476 } else { | |
477 while (chunk.next == null) | |
478 Thread.MemoryBarrier(); | |
479 chunk = chunk.next; | |
480 } | |
432 } | 481 } |
433 | 482 |
434 return result.ToArray(); | 483 return result.ToArray(); |
435 } | 484 } |
436 | 485 |