view Implab/Parallels/MTQueue.cs @ 121:62d2f1e98c4e v2

working version of AsyncQueue and batch operations tests
author cin
date Mon, 12 Jan 2015 18:19:41 +0300
parents 4c945d94b9ab
children
line wrap: on
line source

using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;

namespace Implab.Parallels {
    public class MTQueue<T> : IEnumerable<T> {
        class Node {
            public Node(T value) {
                this.value = value;
            }
            public readonly T value;
            public Node next;
        }

        Node m_first;
        Node m_last;

        public void Enqueue(T value) {
            Thread.MemoryBarrier();

            var last = m_last;
            var next = new Node(value);

            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
            // to ensure that the next node is completely constructed
            while (last != Interlocked.CompareExchange(ref m_last, next, last))
                last = m_last;

            if (last != null)
                last.next = next;
            else
                m_first = next;
        }

        public bool TryDequeue(out T value) {
            Node first;
            Node next;
            value = default(T);

            Thread.MemoryBarrier();
            do {
                first = m_first;
                if (first == null)
                    return false;
                next = first.next;
                if (next == null) {
                    // this is the last element,
                    // then try to update the tail
                    if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
                        // this is the race condition
                        if (m_last == null)
                            // the queue is empty
                            return false;
                        // tail has been changed, we need to restart
                        continue; 
                    }

                    // tail succesfully updated and first.next will never be changed
                    // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null
                    // however the parallel writer may update the m_first since the m_last is null

                    // so we need to fix inconsistency by setting m_first to null or if it has been
                    // updated by the writer already then we should just to give up
                    Interlocked.CompareExchange(ref m_first, null, first);
                    break;

                }
                if (first == Interlocked.CompareExchange(ref m_first, next, first))
                    // head succesfully updated
                    break;
            } while (true);

            value = first.value;
            return true;
        }

        #region IEnumerable implementation

        class Enumerator : IEnumerator<T> {
            Node m_current;
            Node m_first;

            public Enumerator(Node first) {
                m_first = first;
            }

            #region IEnumerator implementation

            public bool MoveNext() {
                m_current = m_current == null ? m_first : m_current.next;
                return m_current != null;
            }

            public void Reset() {
                m_current = null;
            }

            object IEnumerator.Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion

            #region IDisposable implementation

            public void Dispose() {
            }

            #endregion

            #region IEnumerator implementation

            public T Current {
                get {
                    if (m_current == null)
                        throw new InvalidOperationException();
                    return m_current.value;
                }
            }

            #endregion
        }

        public IEnumerator<T> GetEnumerator() {
            return new Enumerator(m_first);
        }

        #endregion

        #region IEnumerable implementation

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion
    }
}