view Implab/Parallels/SimpleAsyncQueue.cs @ 233:d6fe09f5592c v2

Improved AsyncQueue Removed ImplabFx
author cin
date Wed, 04 Oct 2017 15:44:47 +0300
parents
children cbe10ac0731e
line wrap: on
line source

using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;

namespace Implab.Parallels {
    public class SimpleAsyncQueue<T> : IEnumerable<T> {
        class Node {
            public Node(T value) {
                this.value = value;
            }
            public readonly T value;
            public volatile Node next;
        }

        // the reader and the writer are mainteined completely independent,
        // the reader can read next item when m_first.next is not null
        // the writer creates the a new node, moves m_last to this node and
        // only after that restores the reference from the previous node
        // making available the reader to read the new node.

        Node m_first; // position on the node which is already read
        Node m_last; // position on the node which is already written

        public SimpleAsyncQueue() {
            m_first = m_last = new Node(default(T));
        }

        public void Enqueue(T value) {
            var next = new Node(value);

            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
            // to ensure that the next node is completely constructed
            var last = Interlocked.Exchange(ref m_last, next);

            // release-fence
            last.next = next;
            
        }

        public bool TryDequeue(out T value) {
            Node first;
            Node next;

            Thread.MemoryBarrier(); // ensure m_first is fresh
            SpinWait spin = new SpinWait();
            do {
                first = m_first;
                // aquire-fence
                next = first.next;
                if (next == null) {
                    value = default(T);
                    return false;
                }
                
                if (first == Interlocked.CompareExchange(ref m_first, next, first))
                    // head succesfully updated
                    break;
                spin.SpinOnce();
            } while (true);

            value = next.value;
            return true;
        }

        #region IEnumerable implementation

        class Enumerator : IEnumerator<T> {
            Node m_current;
            Node m_first;

            public Enumerator(Node first) {
                m_first = first;
            }

            #region IEnumerator implementation

            public bool MoveNext() {
                m_current = m_current == null ? m_first : m_current.next;
                return m_current != null;
            }

            public void Reset() {
                m_current = null;
            }

            object IEnumerator.Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion

            #region IDisposable implementation

            public void Dispose() {
            }

            #endregion

            #region IEnumerator implementation

            public T Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion
        }

        public IEnumerator<T> GetEnumerator() {
            return new Enumerator(m_first);
        }

        #endregion

        #region IEnumerable implementation

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion
    }
}