comparison Implab/Parallels/AsyncQueue.cs @ 123:f4d6ea6969cc v2

async queue improvements
author cin
date Tue, 13 Jan 2015 01:42:38 +0300
parents 0c8685c8b56b
children a336cb13c6a9
comparison
equal deleted inserted replaced
122:0c8685c8b56b 123:f4d6ea6969cc
384 // if the head is already updated then give up 384 // if the head is already updated then give up
385 return; 385 return;
386 386
387 } 387 }
388 388
389 public void Clear() {
390 // start the new queue
391 var t = new Chunk(m_chunkSize);
392 Thread.MemoryBarrier();
393 m_last = t;
394 Thread.MemoryBarrier();
395
396 // make the new queue available to the readers, and stop the old one
397 m_first = t;
398 Thread.MemoryBarrier();
399 }
400
401 public T[] Drain() {
402 // start the new queue
403 var t = new Chunk(m_chunkSize);
404 Thread.MemoryBarrier();
405 m_last = t;
406 Thread.MemoryBarrier();
407
408 // make the new queue available to the readers, and stop the old one
409 Chunk first;
410
411 do {
412 first = m_first;
413 } while(first != Interlocked.CompareExchange(ref m_first
414 Thread.MemoryBarrier();
415
416
417 }
418
419 T[] ReadChunks(Chunk chunk) {
420 var result = new List<T>();
421 var buffer = new T[m_chunkSize];
422 int actual;
423 bool recycle;
424 while (chunk != null) {
425 // we need to read the chunk using this way
426 // since some client still may completing the dequeue
427 // operation, such clients most likely won't get results
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
430
431 chunk = chunk.next;
432 }
433
434 return result.ToArray();
435 }
436
437 struct ArraySegmentCollection : ICollection<T> {
438 readonly T[] m_data;
439 readonly int m_offset;
440 readonly int m_length;
441
442 public ArraySegmentCollection(T[] data, int offset, int length) {
443 m_data = data;
444 m_offset = offset;
445 m_length = length;
446 }
447
448 #region ICollection implementation
449
450 public void Add(T item) {
451 throw new InvalidOperationException();
452 }
453
454 public void Clear() {
455 throw new InvalidOperationException();
456 }
457
458 public bool Contains(T item) {
459 return false;
460 }
461
462 public void CopyTo(T[] array, int arrayIndex) {
463 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
464 }
465
466 public bool Remove(T item) {
467 throw new NotImplementedException();
468 }
469
470 public int Count {
471 get {
472 return m_length;
473 }
474 }
475
476 public bool IsReadOnly {
477 get {
478 return true;
479 }
480 }
481
482 #endregion
483
484 #region IEnumerable implementation
485
486 public IEnumerator<T> GetEnumerator() {
487 for (int i = m_offset; i < m_length + m_offset; i++)
488 yield return m_data[i];
489 }
490
491 #endregion
492
493 #region IEnumerable implementation
494
495 IEnumerator IEnumerable.GetEnumerator() {
496 return GetEnumerator();
497 }
498
499 #endregion
500 }
501
389 #region IEnumerable implementation 502 #region IEnumerable implementation
390 503
391 class Enumerator : IEnumerator<T> { 504 class Enumerator : IEnumerator<T> {
392 Chunk m_current; 505 Chunk m_current;
393 int m_pos = -1; 506 int m_pos = -1;