Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 123:f4d6ea6969cc v2
async queue improvements
author | cin |
---|---|
date | Tue, 13 Jan 2015 01:42:38 +0300 |
parents | 0c8685c8b56b |
children | a336cb13c6a9 |
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs Mon Jan 12 22:20:45 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Tue Jan 13 01:42:38 2015 +0300 @@ -386,6 +386,119 @@ } + public void Clear() { + // start the new queue + var t = new Chunk(m_chunkSize); + Thread.MemoryBarrier(); + m_last = t; + Thread.MemoryBarrier(); + + // make the new queue available to the readers, and stop the old one + m_first = t; + Thread.MemoryBarrier(); + } + + public T[] Drain() { + // start the new queue + var t = new Chunk(m_chunkSize); + Thread.MemoryBarrier(); + m_last = t; + Thread.MemoryBarrier(); + + // make the new queue available to the readers, and stop the old one + Chunk first; + + do { + first = m_first; + } while(first != Interlocked.CompareExchange(ref m_first + Thread.MemoryBarrier(); + + + } + + T[] ReadChunks(Chunk chunk) { + var result = new List<T>(); + var buffer = new T[m_chunkSize]; + int actual; + bool recycle; + while (chunk != null) { + // we need to read the chunk using this way + // since some client still may completing the dequeue + // operation, such clients most likely won't get results + while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) + result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); + + chunk = chunk.next; + } + + return result.ToArray(); + } + + struct ArraySegmentCollection : ICollection<T> { + readonly T[] m_data; + readonly int m_offset; + readonly int m_length; + + public ArraySegmentCollection(T[] data, int offset, int length) { + m_data = data; + m_offset = offset; + m_length = length; + } + + #region ICollection implementation + + public void Add(T item) { + throw new InvalidOperationException(); + } + + public void Clear() { + throw new InvalidOperationException(); + } + + public bool Contains(T item) { + return false; + } + + public void CopyTo(T[] array, int arrayIndex) { + Array.Copy(m_data,m_offset,array,arrayIndex, m_length); + } + + public bool Remove(T item) { + throw new NotImplementedException(); + } + + public int Count { + get { + return m_length; + } + } + + public bool IsReadOnly { + get { + return true; + } + } + + #endregion + + #region IEnumerable implementation + + public IEnumerator<T> GetEnumerator() { + for (int i = m_offset; i < m_length + m_offset; i++) + yield return m_data[i]; + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } + #region IEnumerable implementation class Enumerator : IEnumerator<T> {