Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 124:a336cb13c6a9 v2
major update, added Drain mathod to AsyncQueue class
| author | cin |
|---|---|
| date | Thu, 15 Jan 2015 02:43:14 +0300 |
| parents | f4d6ea6969cc |
| children | f803565868a4 |
comparison
equal
deleted
inserted
replaced
| 123:f4d6ea6969cc | 124:a336cb13c6a9 |
|---|---|
| 1 using System.Threading; | 1 using System.Threading; |
| 2 using System.Collections.Generic; | 2 using System.Collections.Generic; |
| 3 using System; | 3 using System; |
| 4 using System.Collections; | 4 using System.Collections; |
| 5 using System.Diagnostics; | |
| 5 | 6 |
| 6 namespace Implab.Parallels { | 7 namespace Implab.Parallels { |
| 7 public class AsyncQueue<T> : IEnumerable<T> { | 8 public class AsyncQueue<T> : IEnumerable<T> { |
| 8 class Chunk { | 9 class Chunk { |
| 9 public Chunk next; | 10 public Chunk next; |
| 56 | 57 |
| 57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | 58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { |
| 58 // spin wait for commit | 59 // spin wait for commit |
| 59 } | 60 } |
| 60 return true; | 61 return true; |
| 62 } | |
| 63 | |
| 64 /// <summary> | |
| 65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete | |
| 66 /// </summary> | |
| 67 public void Commit() { | |
| 68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); | |
| 69 | |
| 70 while (m_hi != actual) | |
| 71 Thread.MemoryBarrier(); | |
| 61 } | 72 } |
| 62 | 73 |
| 63 public bool TryDequeue(out T value, out bool recycle) { | 74 public bool TryDequeue(out T value, out bool recycle) { |
| 64 int low; | 75 int low; |
| 65 do { | 76 do { |
| 357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) |
| 358 return false; | 369 return false; |
| 359 | 370 |
| 360 if (last != null) | 371 if (last != null) |
| 361 last.next = chunk; | 372 last.next = chunk; |
| 362 else | 373 else { |
| 363 m_first = chunk; | 374 m_first = chunk; |
| 375 } | |
| 364 return true; | 376 return true; |
| 365 } | 377 } |
| 366 | 378 |
| 367 void RecycleFirstChunk(Chunk first) { | 379 void RecycleFirstChunk(Chunk first) { |
| 368 var next = first.next; | 380 var next = first.next; |
| 369 | 381 |
| 382 if (first != Interlocked.CompareExchange(ref m_first, next, first)) | |
| 383 return; | |
| 384 | |
| 370 if (next == null) { | 385 if (next == null) { |
| 371 // looks like this is the last chunk | 386 |
| 372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | 387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { |
| 388 /*while (first.next == null) | |
| 389 Thread.MemoryBarrier();*/ | |
| 390 | |
| 373 // race | 391 // race |
| 374 // maybe someone already recycled this chunk | 392 // someone already updated the tail, restore the pointer to the queue head |
| 375 // or a new chunk has been appedned to the queue | 393 m_first = first; |
| 376 | |
| 377 return; // give up | |
| 378 } | 394 } |
| 379 // the tail is updated | 395 // the tail is updated |
| 380 } | 396 } |
| 381 | 397 |
| 382 // we need to update the head | 398 // we need to update the head |
| 383 Interlocked.CompareExchange(ref m_first, next, first); | 399 //Interlocked.CompareExchange(ref m_first, next, first); |
| 384 // if the head is already updated then give up | 400 // if the head is already updated then give up |
| 385 return; | 401 //return; |
| 386 | 402 |
| 387 } | 403 } |
| 388 | 404 |
| 389 public void Clear() { | 405 public void Clear() { |
| 390 // start the new queue | 406 // start the new queue |
| 391 var t = new Chunk(m_chunkSize); | 407 var chunk = new Chunk(m_chunkSize); |
| 392 Thread.MemoryBarrier(); | 408 |
| 393 m_last = t; | 409 do { |
| 394 Thread.MemoryBarrier(); | 410 Thread.MemoryBarrier(); |
| 395 | 411 var first = m_first; |
| 396 // make the new queue available to the readers, and stop the old one | 412 var last = m_last; |
| 397 m_first = t; | 413 |
| 398 Thread.MemoryBarrier(); | 414 if (last == null) // nothing to clear |
| 415 return; | |
| 416 | |
| 417 if (first == null || (first.next == null && first != last)) // inconcistency | |
| 418 continue; | |
| 419 | |
| 420 // here we will create inconsistency which will force others to spin | |
| 421 // and prevent from fetching. chunk.next = null | |
| 422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
| 423 continue;// inconsistent | |
| 424 | |
| 425 m_last = chunk; | |
| 426 | |
| 427 return; | |
| 428 | |
| 429 } while(true); | |
| 399 } | 430 } |
| 400 | 431 |
| 401 public T[] Drain() { | 432 public T[] Drain() { |
| 402 // start the new queue | 433 // start the new queue |
| 403 var t = new Chunk(m_chunkSize); | 434 var chunk = new Chunk(m_chunkSize); |
| 404 Thread.MemoryBarrier(); | |
| 405 m_last = t; | |
| 406 Thread.MemoryBarrier(); | |
| 407 | |
| 408 // make the new queue available to the readers, and stop the old one | |
| 409 Chunk first; | |
| 410 | 435 |
| 411 do { | 436 do { |
| 412 first = m_first; | 437 Thread.MemoryBarrier(); |
| 413 } while(first != Interlocked.CompareExchange(ref m_first | 438 var first = m_first; |
| 414 Thread.MemoryBarrier(); | 439 var last = m_last; |
| 415 | 440 |
| 416 | 441 if (last == null) |
| 442 return new T[0]; | |
| 443 | |
| 444 if (first == null || (first.next == null && first != last)) | |
| 445 continue; | |
| 446 | |
| 447 // here we will create inconsistency which will force others to spin | |
| 448 // and prevent from fetching. chunk.next = null | |
| 449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | |
| 450 continue;// inconsistent | |
| 451 | |
| 452 last = Interlocked.Exchange(ref m_last, chunk); | |
| 453 | |
| 454 return ReadChunks(first, last); | |
| 455 | |
| 456 } while(true); | |
| 417 } | 457 } |
| 418 | 458 |
| 419 T[] ReadChunks(Chunk chunk) { | 459 T[] ReadChunks(Chunk chunk, object last) { |
| 420 var result = new List<T>(); | 460 var result = new List<T>(); |
| 421 var buffer = new T[m_chunkSize]; | 461 var buffer = new T[m_chunkSize]; |
| 422 int actual; | 462 int actual; |
| 423 bool recycle; | 463 bool recycle; |
| 424 while (chunk != null) { | 464 while (chunk != null) { |
| 465 // ensure all write operations on the chunk are complete | |
| 466 chunk.Commit(); | |
| 467 | |
| 425 // we need to read the chunk using this way | 468 // we need to read the chunk using this way |
| 426 // since some client still may completing the dequeue | 469 // since some client still may completing the dequeue |
| 427 // operation, such clients most likely won't get results | 470 // operation, such clients most likely won't get results |
| 428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | 471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) |
| 429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | 472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); |
| 430 | 473 |
| 431 chunk = chunk.next; | 474 if (chunk == last) { |
| 475 chunk = null; | |
| 476 } else { | |
| 477 while (chunk.next == null) | |
| 478 Thread.MemoryBarrier(); | |
| 479 chunk = chunk.next; | |
| 480 } | |
| 432 } | 481 } |
| 433 | 482 |
| 434 return result.ToArray(); | 483 return result.ToArray(); |
| 435 } | 484 } |
| 436 | 485 |
