view Implab/Parallels/MTQueue.cs @ 209:a867536c68fc v2

Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise
author cin
date Wed, 16 Nov 2016 03:06:08 +0300
parents 4c945d94b9ab
children
line wrap: on
line source

using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;

namespace Implab.Parallels {
    public class MTQueue<T> : IEnumerable<T> {
        class Node {
            public Node(T value) {
                this.value = value;
            }
            public readonly T value;
            public Node next;
        }

        Node m_first;
        Node m_last;

        public void Enqueue(T value) {
            Thread.MemoryBarrier();

            var last = m_last;
            var next = new Node(value);

            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
            // to ensure that the next node is completely constructed
            while (last != Interlocked.CompareExchange(ref m_last, next, last))
                last = m_last;

            if (last != null)
                last.next = next;
            else
                m_first = next;
        }

        public bool TryDequeue(out T value) {
            Node first;
            Node next;
            value = default(T);

            Thread.MemoryBarrier();
            do {
                first = m_first;
                if (first == null)
                    return false;
                next = first.next;
                if (next == null) {
                    // this is the last element,
                    // then try to update the tail
                    if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
                        // this is the race condition
                        if (m_last == null)
                            // the queue is empty
                            return false;
                        // tail has been changed, we need to restart
                        continue; 
                    }

                    // tail succesfully updated and first.next will never be changed
                    // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null
                    // however the parallel writer may update the m_first since the m_last is null

                    // so we need to fix inconsistency by setting m_first to null or if it has been
                    // updated by the writer already then we should just to give up
                    Interlocked.CompareExchange(ref m_first, null, first);
                    break;

                }
                if (first == Interlocked.CompareExchange(ref m_first, next, first))
                    // head succesfully updated
                    break;
            } while (true);

            value = first.value;
            return true;
        }

        #region IEnumerable implementation

        class Enumerator : IEnumerator<T> {
            Node m_current;
            Node m_first;

            public Enumerator(Node first) {
                m_first = first;
            }

            #region IEnumerator implementation

            public bool MoveNext() {
                m_current = m_current == null ? m_first : m_current.next;
                return m_current != null;
            }

            public void Reset() {
                m_current = null;
            }

            object IEnumerator.Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion

            #region IDisposable implementation

            public void Dispose() {
            }

            #endregion

            #region IEnumerator implementation

            public T Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion
        }

        public IEnumerator<T> GetEnumerator() {
            return new Enumerator(m_first);
        }

        #endregion

        #region IEnumerable implementation

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion
    }
}