comparison Implab/Parallels/AsyncQueue.cs @ 124:a336cb13c6a9 v2

major update, added Drain mathod to AsyncQueue class
author cin
date Thu, 15 Jan 2015 02:43:14 +0300
parents f4d6ea6969cc
children f803565868a4
comparison
equal deleted inserted replaced
123:f4d6ea6969cc 124:a336cb13c6a9
1 using System.Threading; 1 using System.Threading;
2 using System.Collections.Generic; 2 using System.Collections.Generic;
3 using System; 3 using System;
4 using System.Collections; 4 using System.Collections;
5 using System.Diagnostics;
5 6
6 namespace Implab.Parallels { 7 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> { 8 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk { 9 class Chunk {
9 public Chunk next; 10 public Chunk next;
56 57
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { 58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
58 // spin wait for commit 59 // spin wait for commit
59 } 60 }
60 return true; 61 return true;
62 }
63
64 /// <summary>
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
66 /// </summary>
67 public void Commit() {
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
69
70 while (m_hi != actual)
71 Thread.MemoryBarrier();
61 } 72 }
62 73
63 public bool TryDequeue(out T value, out bool recycle) { 74 public bool TryDequeue(out T value, out bool recycle) {
64 int low; 75 int low;
65 do { 76 do {
357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
358 return false; 369 return false;
359 370
360 if (last != null) 371 if (last != null)
361 last.next = chunk; 372 last.next = chunk;
362 else 373 else {
363 m_first = chunk; 374 m_first = chunk;
375 }
364 return true; 376 return true;
365 } 377 }
366 378
367 void RecycleFirstChunk(Chunk first) { 379 void RecycleFirstChunk(Chunk first) {
368 var next = first.next; 380 var next = first.next;
369 381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
370 if (next == null) { 385 if (next == null) {
371 // looks like this is the last chunk 386
372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { 387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388 /*while (first.next == null)
389 Thread.MemoryBarrier();*/
390
373 // race 391 // race
374 // maybe someone already recycled this chunk 392 // someone already updated the tail, restore the pointer to the queue head
375 // or a new chunk has been appedned to the queue 393 m_first = first;
376
377 return; // give up
378 } 394 }
379 // the tail is updated 395 // the tail is updated
380 } 396 }
381 397
382 // we need to update the head 398 // we need to update the head
383 Interlocked.CompareExchange(ref m_first, next, first); 399 //Interlocked.CompareExchange(ref m_first, next, first);
384 // if the head is already updated then give up 400 // if the head is already updated then give up
385 return; 401 //return;
386 402
387 } 403 }
388 404
389 public void Clear() { 405 public void Clear() {
390 // start the new queue 406 // start the new queue
391 var t = new Chunk(m_chunkSize); 407 var chunk = new Chunk(m_chunkSize);
392 Thread.MemoryBarrier(); 408
393 m_last = t; 409 do {
394 Thread.MemoryBarrier(); 410 Thread.MemoryBarrier();
395 411 var first = m_first;
396 // make the new queue available to the readers, and stop the old one 412 var last = m_last;
397 m_first = t; 413
398 Thread.MemoryBarrier(); 414 if (last == null) // nothing to clear
415 return;
416
417 if (first == null || (first.next == null && first != last)) // inconcistency
418 continue;
419
420 // here we will create inconsistency which will force others to spin
421 // and prevent from fetching. chunk.next = null
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
423 continue;// inconsistent
424
425 m_last = chunk;
426
427 return;
428
429 } while(true);
399 } 430 }
400 431
401 public T[] Drain() { 432 public T[] Drain() {
402 // start the new queue 433 // start the new queue
403 var t = new Chunk(m_chunkSize); 434 var chunk = new Chunk(m_chunkSize);
404 Thread.MemoryBarrier();
405 m_last = t;
406 Thread.MemoryBarrier();
407
408 // make the new queue available to the readers, and stop the old one
409 Chunk first;
410 435
411 do { 436 do {
412 first = m_first; 437 Thread.MemoryBarrier();
413 } while(first != Interlocked.CompareExchange(ref m_first 438 var first = m_first;
414 Thread.MemoryBarrier(); 439 var last = m_last;
415 440
416 441 if (last == null)
442 return new T[0];
443
444 if (first == null || (first.next == null && first != last))
445 continue;
446
447 // here we will create inconsistency which will force others to spin
448 // and prevent from fetching. chunk.next = null
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
450 continue;// inconsistent
451
452 last = Interlocked.Exchange(ref m_last, chunk);
453
454 return ReadChunks(first, last);
455
456 } while(true);
417 } 457 }
418 458
419 T[] ReadChunks(Chunk chunk) { 459 T[] ReadChunks(Chunk chunk, object last) {
420 var result = new List<T>(); 460 var result = new List<T>();
421 var buffer = new T[m_chunkSize]; 461 var buffer = new T[m_chunkSize];
422 int actual; 462 int actual;
423 bool recycle; 463 bool recycle;
424 while (chunk != null) { 464 while (chunk != null) {
465 // ensure all write operations on the chunk are complete
466 chunk.Commit();
467
425 // we need to read the chunk using this way 468 // we need to read the chunk using this way
426 // since some client still may completing the dequeue 469 // since some client still may completing the dequeue
427 // operation, such clients most likely won't get results 470 // operation, such clients most likely won't get results
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) 471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); 472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
430 473
431 chunk = chunk.next; 474 if (chunk == last) {
475 chunk = null;
476 } else {
477 while (chunk.next == null)
478 Thread.MemoryBarrier();
479 chunk = chunk.next;
480 }
432 } 481 }
433 482
434 return result.ToArray(); 483 return result.ToArray();
435 } 484 }
436 485