Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | |
children | f1b897999260 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/AsyncQueue.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,244 @@ +using System.Threading; +using System.Collections.Generic; +using System; +using System.Collections; + +namespace Implab.Parallels { + public class AsyncQueue<T> : IEnumerable<T> { + class Chunk { + public Chunk next; + + int m_low; + int m_hi; + int m_alloc; + readonly int m_size; + readonly T[] m_data; + + public Chunk(int size) { + m_size = size; + m_data = new T[size]; + } + + public Chunk(int size, T value) { + m_size = size; + m_hi = 1; + m_alloc = 1; + m_data = new T[size]; + m_data[0] = value; + } + + public int Low { + get { return m_low; } + } + + public int Hi { + get { return m_hi; } + } + + public bool TryEnqueue(T value,out bool extend) { + extend = false; + int alloc; + do { + alloc = m_alloc; + if (alloc > m_size) + return false; + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); + + if (alloc == m_size) { + extend = true; + return false; + } + + m_data[alloc] = value; + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { + // spin wait for commit + } + return true; + } + + public bool TryDequeue(out T value,out bool recycle) { + int low; + do { + low = m_low; + if (low >= m_hi) { + value = default(T); + recycle = (low == m_size); + return false; + } + } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); + + recycle = (low == m_size - 1); + value = m_data[low]; + + return true; + } + + public T GetAt(int pos) { + return m_data[pos]; + } + } + + public const int DEFAULT_CHUNK_SIZE = 32; + + readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; + + Chunk m_first; + Chunk m_last; + + public AsyncQueue() { + m_last = m_first = new Chunk(m_chunkSize); + } + + public void Enqueue(T value) { + var last = m_last; + // spin wait to the new chunk + bool extend = true; + while(last == null || !last.TryEnqueue(value, out extend)) { + // try to extend queue + if (extend || last == null) { + var chunk = new Chunk(m_chunkSize, value); + if (EnqueueChunk(last, chunk)) + break; + last = m_last; + } else { + while (last != m_last) { + Thread.MemoryBarrier(); + last = m_last; + } + } + } + } + + public bool TryDequeue(out T value) { + var chunk = m_first; + bool recycle; + while (chunk != null) { + + var result = chunk.TryDequeue(out value, out recycle); + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + else + return result; // this chunk is usable and returned actual result + + if (result) // this chunk is waste but the true result is always actual + return true; + + // try again + chunk = m_first; + } + + // the queue is empty + value = default(T); + return false; + } + + bool EnqueueChunk(Chunk last, Chunk chunk) { + if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) + return false; + + if (last != null) + last.next = chunk; + else + m_first = chunk; + return true; + } + + void RecycleFirstChunk(Chunk first) { + var next = first.next; + + if (next == null) { + // looks like this is the last chunk + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // race + // maybe someone already recycled this chunk + // or a new chunk has been appedned to the queue + + return; // give up + } + // the tail is updated + } + + // we need to update the head + Interlocked.CompareExchange(ref m_first, next, first); + // if the head is already updated then give up + return; + + } + + #region IEnumerable implementation + + class Enumerator : IEnumerator<T> { + Chunk m_current; + int m_pos = -1; + + public Enumerator(Chunk fisrt) { + m_current = fisrt; + } + + #region IEnumerator implementation + + public bool MoveNext() { + if (m_current == null) + return false; + + if (m_pos == -1) + m_pos = m_current.Low; + else + m_pos++; + if (m_pos == m_current.Hi) { + m_pos = 0; + m_current = m_current.next; + } + + return true; + } + + public void Reset() { + throw new NotSupportedException(); + } + + object IEnumerator.Current { + get { + return Current; + } + } + + #endregion + + #region IDisposable implementation + + public void Dispose() { + } + + #endregion + + #region IEnumerator implementation + + public T Current { + get { + if (m_pos == -1 || m_current == null) + throw new InvalidOperationException(); + return m_current.GetAt(m_pos); + } + } + + #endregion + } + + public IEnumerator<T> GetEnumerator() { + return new Enumerator(m_first); + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } +}