view Implab/Parallels/SimpleAsyncQueue.cs @ 242:cbe10ac0731e v3

Working on promises
author cin
date Wed, 24 Jan 2018 03:03:21 +0300
parents d6fe09f5592c
children
line wrap: on
line source

using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;

namespace Implab.Parallels {
    public class SimpleAsyncQueue<T> : IEnumerable<T> {
        class Node {
            public Node(T value) {
                this.value = value;
            }
            public readonly T value;
            public volatile Node next;
        }

        // the reader and the writer are mainteined completely independent,
        // the reader can read next item when m_first.next is not null
        // the writer creates a new node, moves m_last to this node and
        // only after that restores the reference from the previous node
        // making the reader be able to read the new node.

        volatile Node m_first; // position on the node which is already read
        volatile Node m_last; // position on the node which is already written

        public SimpleAsyncQueue() {
            m_first = m_last = new Node(default(T));
        }

        public void Enqueue(T value) {
            var next = new Node(value);

            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
            // to ensure that the next node is completely constructed
            var last = Interlocked.Exchange(ref m_last, next);

            // release-fence
            last.next = next;

        }

        public bool TryDequeue(out T value) {
            Node first = m_first; ;
            Node next = first.next; ;

            if (next == null) {
                value = default(T);
                return false;
            }

            var first2 = Interlocked.CompareExchange(ref m_first, next, first);

            if (first != first2) {
                // head is updated by someone else

                SpinWait spin = new SpinWait();
                do {
                    first = first2;
                    next = first.next;
                    if (next == null) {
                        value = default(T);
                        return false;
                    }

                    first2 = Interlocked.CompareExchange(ref m_first, next, first);
                    if (first == first2)
                        break;
                    spin.SpinOnce();
                } while (true);
            }

            value = next.value;
            return true;
        }

        #region IEnumerable implementation

        class Enumerator : IEnumerator<T> {
            Node m_current;
            Node m_first;

            public Enumerator(Node first) {
                m_first = first;
            }

            #region IEnumerator implementation

            public bool MoveNext() {
                m_current = m_current == null ? m_first : m_current.next;
                return m_current != null;
            }

            public void Reset() {
                m_current = null;
            }

            object IEnumerator.Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion

            #region IDisposable implementation

            public void Dispose() {
            }

            #endregion

            #region IEnumerator implementation

            public T Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion
        }

        public IEnumerator<T> GetEnumerator() {
            return new Enumerator(m_first);
        }

        #endregion

        #region IEnumerable implementation

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion
    }
}