Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 124:a336cb13c6a9 v2
major update, added Drain mathod to AsyncQueue class
| author | cin | 
|---|---|
| date | Thu, 15 Jan 2015 02:43:14 +0300 | 
| parents | f4d6ea6969cc | 
| children | f803565868a4 | 
   comparison
  equal
  deleted
  inserted
  replaced
| 123:f4d6ea6969cc | 124:a336cb13c6a9 | 
|---|---|
| 1 using System.Threading; | 1 using System.Threading; | 
| 2 using System.Collections.Generic; | 2 using System.Collections.Generic; | 
| 3 using System; | 3 using System; | 
| 4 using System.Collections; | 4 using System.Collections; | 
| 5 using System.Diagnostics; | |
| 5 | 6 | 
| 6 namespace Implab.Parallels { | 7 namespace Implab.Parallels { | 
| 7 public class AsyncQueue<T> : IEnumerable<T> { | 8 public class AsyncQueue<T> : IEnumerable<T> { | 
| 8 class Chunk { | 9 class Chunk { | 
| 9 public Chunk next; | 10 public Chunk next; | 
| 56 | 57 | 
| 57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | 58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | 
| 58 // spin wait for commit | 59 // spin wait for commit | 
| 59 } | 60 } | 
| 60 return true; | 61 return true; | 
| 62 } | |
| 63 | |
| 64 /// <summary> | |
| 65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete | |
| 66 /// </summary> | |
| 67 public void Commit() { | |
| 68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); | |
| 69 | |
| 70 while (m_hi != actual) | |
| 71 Thread.MemoryBarrier(); | |
| 61 } | 72 } | 
| 62 | 73 | 
| 63 public bool TryDequeue(out T value, out bool recycle) { | 74 public bool TryDequeue(out T value, out bool recycle) { | 
| 64 int low; | 75 int low; | 
| 65 do { | 76 do { | 
| 357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | 
| 358 return false; | 369 return false; | 
| 359 | 370 | 
| 360 if (last != null) | 371 if (last != null) | 
| 361 last.next = chunk; | 372 last.next = chunk; | 
| 362 else | 373 else { | 
| 363 m_first = chunk; | 374 m_first = chunk; | 
| 375 } | |
| 364 return true; | 376 return true; | 
| 365 } | 377 } | 
| 366 | 378 | 
| 367 void RecycleFirstChunk(Chunk first) { | 379 void RecycleFirstChunk(Chunk first) { | 
| 368 var next = first.next; | 380 var next = first.next; | 
| 369 | 381 | 
| 382 if (first != Interlocked.CompareExchange(ref m_first, next, first)) | |
| 383 return; | |
| 384 | |
| 370 if (next == null) { | 385 if (next == null) { | 
| 371 // looks like this is the last chunk | 386 | 
| 372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | 387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | 
| 388 /*while (first.next == null) | |
| 389 Thread.MemoryBarrier();*/ | |
| 390 | |
| 373 // race | 391 // race | 
| 374 // maybe someone already recycled this chunk | 392 // someone already updated the tail, restore the pointer to the queue head | 
| 375 // or a new chunk has been appedned to the queue | 393 m_first = first; | 
| 376 | |
| 377 return; // give up | |
| 378 } | 394 } | 
| 379 // the tail is updated | 395 // the tail is updated | 
| 380 } | 396 } | 
| 381 | 397 | 
| 382 // we need to update the head | 398 // we need to update the head | 
| 383 Interlocked.CompareExchange(ref m_first, next, first); | 399 //Interlocked.CompareExchange(ref m_first, next, first); | 
| 384 // if the head is already updated then give up | 400 // if the head is already updated then give up | 
| 385 return; | 401 //return; | 
| 386 | 402 | 
| 387 } | 403 } | 
| 388 | 404 | 
| 389 public void Clear() { | 405 public void Clear() { | 
| 390 // start the new queue | 406 // start the new queue | 
| 391 var t = new Chunk(m_chunkSize); | 407 var chunk = new Chunk(m_chunkSize); | 
| 392 Thread.MemoryBarrier(); | 408 | 
| 393 m_last = t; | 409 do { | 
| 394 Thread.MemoryBarrier(); | 410 Thread.MemoryBarrier(); | 
| 395 | 411 var first = m_first; | 
| 396 // make the new queue available to the readers, and stop the old one | 412 var last = m_last; | 
| 397 m_first = t; | 413 | 
| 398 Thread.MemoryBarrier(); | 414 if (last == null) // nothing to clear | 
| 415 return; | |
| 416 | |
| 417 if (first == null || (first.next == null && first != last)) // inconcistency | |
| 418 continue; | |
| 419 | |
| 420 // here we will create inconsistency which will force others to spin | |
| 421 // and prevent from fetching. chunk.next = null | |
| 422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
| 423 continue;// inconsistent | |
| 424 | |
| 425 m_last = chunk; | |
| 426 | |
| 427 return; | |
| 428 | |
| 429 } while(true); | |
| 399 } | 430 } | 
| 400 | 431 | 
| 401 public T[] Drain() { | 432 public T[] Drain() { | 
| 402 // start the new queue | 433 // start the new queue | 
| 403 var t = new Chunk(m_chunkSize); | 434 var chunk = new Chunk(m_chunkSize); | 
| 404 Thread.MemoryBarrier(); | |
| 405 m_last = t; | |
| 406 Thread.MemoryBarrier(); | |
| 407 | |
| 408 // make the new queue available to the readers, and stop the old one | |
| 409 Chunk first; | |
| 410 | 435 | 
| 411 do { | 436 do { | 
| 412 first = m_first; | 437 Thread.MemoryBarrier(); | 
| 413 } while(first != Interlocked.CompareExchange(ref m_first | 438 var first = m_first; | 
| 414 Thread.MemoryBarrier(); | 439 var last = m_last; | 
| 415 | 440 | 
| 416 | 441 if (last == null) | 
| 442 return new T[0]; | |
| 443 | |
| 444 if (first == null || (first.next == null && first != last)) | |
| 445 continue; | |
| 446 | |
| 447 // here we will create inconsistency which will force others to spin | |
| 448 // and prevent from fetching. chunk.next = null | |
| 449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
| 450 continue;// inconsistent | |
| 451 | |
| 452 last = Interlocked.Exchange(ref m_last, chunk); | |
| 453 | |
| 454 return ReadChunks(first, last); | |
| 455 | |
| 456 } while(true); | |
| 417 } | 457 } | 
| 418 | 458 | 
| 419 T[] ReadChunks(Chunk chunk) { | 459 T[] ReadChunks(Chunk chunk, object last) { | 
| 420 var result = new List<T>(); | 460 var result = new List<T>(); | 
| 421 var buffer = new T[m_chunkSize]; | 461 var buffer = new T[m_chunkSize]; | 
| 422 int actual; | 462 int actual; | 
| 423 bool recycle; | 463 bool recycle; | 
| 424 while (chunk != null) { | 464 while (chunk != null) { | 
| 465 // ensure all write operations on the chunk are complete | |
| 466 chunk.Commit(); | |
| 467 | |
| 425 // we need to read the chunk using this way | 468 // we need to read the chunk using this way | 
| 426 // since some client still may completing the dequeue | 469 // since some client still may completing the dequeue | 
| 427 // operation, such clients most likely won't get results | 470 // operation, such clients most likely won't get results | 
| 428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | 471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | 
| 429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | 472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | 
| 430 | 473 | 
| 431 chunk = chunk.next; | 474 if (chunk == last) { | 
| 475 chunk = null; | |
| 476 } else { | |
| 477 while (chunk.next == null) | |
| 478 Thread.MemoryBarrier(); | |
| 479 chunk = chunk.next; | |
| 480 } | |
| 432 } | 481 } | 
| 433 | 482 | 
| 434 return result.ToArray(); | 483 return result.ToArray(); | 
| 435 } | 484 } | 
| 436 | 485 | 
