# HG changeset patch # User cin # Date 1421102558 -10800 # Node ID f4d6ea6969cc21006dedc2227988670dd4115be2 # Parent 0c8685c8b56b275c5c564c95c8187cebe284be4e async queue improvements diff -r 0c8685c8b56b -r f4d6ea6969cc Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs Mon Jan 12 22:20:45 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Tue Jan 13 01:42:38 2015 +0300 @@ -386,6 +386,119 @@ } + public void Clear() { + // start the new queue + var t = new Chunk(m_chunkSize); + Thread.MemoryBarrier(); + m_last = t; + Thread.MemoryBarrier(); + + // make the new queue available to the readers, and stop the old one + m_first = t; + Thread.MemoryBarrier(); + } + + public T[] Drain() { + // start the new queue + var t = new Chunk(m_chunkSize); + Thread.MemoryBarrier(); + m_last = t; + Thread.MemoryBarrier(); + + // make the new queue available to the readers, and stop the old one + Chunk first; + + do { + first = m_first; + } while(first != Interlocked.CompareExchange(ref m_first + Thread.MemoryBarrier(); + + + } + + T[] ReadChunks(Chunk chunk) { + var result = new List(); + var buffer = new T[m_chunkSize]; + int actual; + bool recycle; + while (chunk != null) { + // we need to read the chunk using this way + // since some client still may completing the dequeue + // operation, such clients most likely won't get results + while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) + result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); + + chunk = chunk.next; + } + + return result.ToArray(); + } + + struct ArraySegmentCollection : ICollection { + readonly T[] m_data; + readonly int m_offset; + readonly int m_length; + + public ArraySegmentCollection(T[] data, int offset, int length) { + m_data = data; + m_offset = offset; + m_length = length; + } + + #region ICollection implementation + + public void Add(T item) { + throw new InvalidOperationException(); + } + + public void Clear() { + throw new InvalidOperationException(); + } + + public bool Contains(T item) { + return false; + } + + public void CopyTo(T[] array, int arrayIndex) { + Array.Copy(m_data,m_offset,array,arrayIndex, m_length); + } + + public bool Remove(T item) { + throw new NotImplementedException(); + } + + public int Count { + get { + return m_length; + } + } + + public bool IsReadOnly { + get { + return true; + } + } + + #endregion + + #region IEnumerable implementation + + public IEnumerator GetEnumerator() { + for (int i = m_offset; i < m_length + m_offset; i++) + yield return m_data[i]; + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } + #region IEnumerable implementation class Enumerator : IEnumerator {