Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 123:f4d6ea6969cc v2
async queue improvements
author | cin |
---|---|
date | Tue, 13 Jan 2015 01:42:38 +0300 |
parents | 0c8685c8b56b |
children | a336cb13c6a9 |
comparison
equal
deleted
inserted
replaced
122:0c8685c8b56b | 123:f4d6ea6969cc |
---|---|
384 // if the head is already updated then give up | 384 // if the head is already updated then give up |
385 return; | 385 return; |
386 | 386 |
387 } | 387 } |
388 | 388 |
389 public void Clear() { | |
390 // start the new queue | |
391 var t = new Chunk(m_chunkSize); | |
392 Thread.MemoryBarrier(); | |
393 m_last = t; | |
394 Thread.MemoryBarrier(); | |
395 | |
396 // make the new queue available to the readers, and stop the old one | |
397 m_first = t; | |
398 Thread.MemoryBarrier(); | |
399 } | |
400 | |
401 public T[] Drain() { | |
402 // start the new queue | |
403 var t = new Chunk(m_chunkSize); | |
404 Thread.MemoryBarrier(); | |
405 m_last = t; | |
406 Thread.MemoryBarrier(); | |
407 | |
408 // make the new queue available to the readers, and stop the old one | |
409 Chunk first; | |
410 | |
411 do { | |
412 first = m_first; | |
413 } while(first != Interlocked.CompareExchange(ref m_first | |
414 Thread.MemoryBarrier(); | |
415 | |
416 | |
417 } | |
418 | |
419 T[] ReadChunks(Chunk chunk) { | |
420 var result = new List<T>(); | |
421 var buffer = new T[m_chunkSize]; | |
422 int actual; | |
423 bool recycle; | |
424 while (chunk != null) { | |
425 // we need to read the chunk using this way | |
426 // since some client still may completing the dequeue | |
427 // operation, such clients most likely won't get results | |
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) | |
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); | |
430 | |
431 chunk = chunk.next; | |
432 } | |
433 | |
434 return result.ToArray(); | |
435 } | |
436 | |
437 struct ArraySegmentCollection : ICollection<T> { | |
438 readonly T[] m_data; | |
439 readonly int m_offset; | |
440 readonly int m_length; | |
441 | |
442 public ArraySegmentCollection(T[] data, int offset, int length) { | |
443 m_data = data; | |
444 m_offset = offset; | |
445 m_length = length; | |
446 } | |
447 | |
448 #region ICollection implementation | |
449 | |
450 public void Add(T item) { | |
451 throw new InvalidOperationException(); | |
452 } | |
453 | |
454 public void Clear() { | |
455 throw new InvalidOperationException(); | |
456 } | |
457 | |
458 public bool Contains(T item) { | |
459 return false; | |
460 } | |
461 | |
462 public void CopyTo(T[] array, int arrayIndex) { | |
463 Array.Copy(m_data,m_offset,array,arrayIndex, m_length); | |
464 } | |
465 | |
466 public bool Remove(T item) { | |
467 throw new NotImplementedException(); | |
468 } | |
469 | |
470 public int Count { | |
471 get { | |
472 return m_length; | |
473 } | |
474 } | |
475 | |
476 public bool IsReadOnly { | |
477 get { | |
478 return true; | |
479 } | |
480 } | |
481 | |
482 #endregion | |
483 | |
484 #region IEnumerable implementation | |
485 | |
486 public IEnumerator<T> GetEnumerator() { | |
487 for (int i = m_offset; i < m_length + m_offset; i++) | |
488 yield return m_data[i]; | |
489 } | |
490 | |
491 #endregion | |
492 | |
493 #region IEnumerable implementation | |
494 | |
495 IEnumerator IEnumerable.GetEnumerator() { | |
496 return GetEnumerator(); | |
497 } | |
498 | |
499 #endregion | |
500 } | |
501 | |
389 #region IEnumerable implementation | 502 #region IEnumerable implementation |
390 | 503 |
391 class Enumerator : IEnumerator<T> { | 504 class Enumerator : IEnumerator<T> { |
392 Chunk m_current; | 505 Chunk m_current; |
393 int m_pos = -1; | 506 int m_pos = -1; |