Mercurial > pub > ImplabNet
view Implab/Parallels/AsyncQueue.cs @ 120:f1b897999260 v2
improved asyncpool usability
working on batch operations on asyncqueue
author | cin |
---|---|
date | Mon, 12 Jan 2015 05:19:52 +0300 |
parents | 2573b562e328 |
children | 62d2f1e98c4e |
line wrap: on
line source
using System.Threading; using System.Collections.Generic; using System; using System.Collections; namespace Implab.Parallels { public class AsyncQueue<T> : IEnumerable<T> { class Chunk { public Chunk next; int m_low; int m_hi; int m_alloc; readonly int m_size; readonly T[] m_data; public Chunk(int size) { m_size = size; m_data = new T[size]; } public Chunk(int size, T value) { m_size = size; m_hi = 1; m_alloc = 1; m_data = new T[size]; m_data[0] = value; } public int Low { get { return m_low; } } public int Hi { get { return m_hi; } } public bool TryEnqueue(T value,out bool extend) { var alloc = Interlocked.Increment(ref m_alloc) - 1; if (alloc >= m_size) { extend = alloc == m_size; return false; } extend = false; m_data[alloc] = value; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { // spin wait for commit } return true; } public bool TryDequeue(out T value,out bool recycle) { int low; do { low = m_low; if (low >= m_hi) { value = default(T); recycle = (low == m_size); return false; } } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); recycle = (low == m_size - 1); value = m_data[low]; return true; } public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { int alloc; int allocSize; do { alloc = m_alloc; if (alloc > m_size) { enqueued = 0; extend = false; return false; } allocSize = Math.Min(m_size - m_alloc, length); } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); if (alloc == m_size) { enqueued = 0; extend = true; return false; } Array.Copy(batch, offset, m_data, alloc, allocSize); enqueued = allocSize; extend = false; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { // spin wait for commit } return true; } public T GetAt(int pos) { return m_data[pos]; } } public const int DEFAULT_CHUNK_SIZE = 32; readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; Chunk m_first; Chunk m_last; public AsyncQueue() { m_last = m_first = new Chunk(m_chunkSize); } public void Enqueue(T value) { var last = m_last; // spin wait to the new chunk bool extend = true; while(last == null || !last.TryEnqueue(value, out extend)) { // try to extend queue if (extend || last == null) { var chunk = new Chunk(m_chunkSize, value); if (EnqueueChunk(last, chunk)) break; last = m_last; } else { while (last != m_last) { Thread.MemoryBarrier(); last = m_last; } } } } public bool TryDequeue(out T value) { var chunk = m_first; bool recycle; while (chunk != null) { var result = chunk.TryDequeue(out value, out recycle); if (recycle) // this chunk is waste RecycleFirstChunk(chunk); else return result; // this chunk is usable and returned actual result if (result) // this chunk is waste but the true result is always actual return true; // try again chunk = m_first; } // the queue is empty value = default(T); return false; } bool EnqueueChunk(Chunk last, Chunk chunk) { if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) return false; if (last != null) last.next = chunk; else m_first = chunk; return true; } void RecycleFirstChunk(Chunk first) { var next = first.next; if (next == null) { // looks like this is the last chunk if (first != Interlocked.CompareExchange(ref m_last, null, first)) { // race // maybe someone already recycled this chunk // or a new chunk has been appedned to the queue return; // give up } // the tail is updated } // we need to update the head Interlocked.CompareExchange(ref m_first, next, first); // if the head is already updated then give up return; } #region IEnumerable implementation class Enumerator : IEnumerator<T> { Chunk m_current; int m_pos = -1; public Enumerator(Chunk fisrt) { m_current = fisrt; } #region IEnumerator implementation public bool MoveNext() { if (m_current == null) return false; if (m_pos == -1) m_pos = m_current.Low; else m_pos++; if (m_pos == m_current.Hi) { m_pos = 0; m_current = m_current.next; } return true; } public void Reset() { throw new NotSupportedException(); } object IEnumerator.Current { get { return Current; } } #endregion #region IDisposable implementation public void Dispose() { } #endregion #region IEnumerator implementation public T Current { get { if (m_pos == -1 || m_current == null) throw new InvalidOperationException(); return m_current.GetAt(m_pos); } } #endregion } public IEnumerator<T> GetEnumerator() { return new Enumerator(m_first); } #endregion #region IEnumerable implementation IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } #endregion } }