Mercurial > pub > ImplabNet
view Implab/Parallels/SimpleAsyncQueue.cs @ 245:b904e0a3ba72 v3
working on promises
author | cin |
---|---|
date | Fri, 26 Jan 2018 04:13:34 +0300 |
parents | cbe10ac0731e |
children |
line wrap: on
line source
using System.Threading; using System.Collections.Generic; using System; using System.Collections; namespace Implab.Parallels { public class SimpleAsyncQueue<T> : IEnumerable<T> { class Node { public Node(T value) { this.value = value; } public readonly T value; public volatile Node next; } // the reader and the writer are mainteined completely independent, // the reader can read next item when m_first.next is not null // the writer creates a new node, moves m_last to this node and // only after that restores the reference from the previous node // making the reader be able to read the new node. volatile Node m_first; // position on the node which is already read volatile Node m_last; // position on the node which is already written public SimpleAsyncQueue() { m_first = m_last = new Node(default(T)); } public void Enqueue(T value) { var next = new Node(value); // Interlocaked.CompareExchange implies Thread.MemoryBarrier(); // to ensure that the next node is completely constructed var last = Interlocked.Exchange(ref m_last, next); // release-fence last.next = next; } public bool TryDequeue(out T value) { Node first = m_first; ; Node next = first.next; ; if (next == null) { value = default(T); return false; } var first2 = Interlocked.CompareExchange(ref m_first, next, first); if (first != first2) { // head is updated by someone else SpinWait spin = new SpinWait(); do { first = first2; next = first.next; if (next == null) { value = default(T); return false; } first2 = Interlocked.CompareExchange(ref m_first, next, first); if (first == first2) break; spin.SpinOnce(); } while (true); } value = next.value; return true; } #region IEnumerable implementation class Enumerator : IEnumerator<T> { Node m_current; Node m_first; public Enumerator(Node first) { m_first = first; } #region IEnumerator implementation public bool MoveNext() { m_current = m_current == null ? m_first : m_current.next; return m_current != null; } public void Reset() { m_current = null; } object IEnumerator.Current { get { if (m_current == null) throw new InvalidOperationException(); return m_current.value; } } #endregion #region IDisposable implementation public void Dispose() { } #endregion #region IEnumerator implementation public T Current { get { if (m_current == null) throw new InvalidOperationException(); return m_current.value; } } #endregion } public IEnumerator<T> GetEnumerator() { return new Enumerator(m_first); } #endregion #region IEnumerable implementation IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } #endregion } }