view Implab/Parallels/MTQueue.cs @ 81:2c5631b43c7d v2

dispatch pool rewritten
author cin
date Fri, 26 Sep 2014 20:44:01 +0400
parents 4f20870d0816
children dc4942d09e74
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace Implab.Parallels {
    public class MTQueue<T> {
        class Node {
            public Node(T value) {
                this.value = value;
            }
            public readonly T value;
            public Node next;
        }

        Node m_first;
        Node m_last;

        public void Enqueue(T value) {
            Thread.MemoryBarrier();

            var last = m_last;
            var next = new Node(value);

            while (last != Interlocked.CompareExchange(ref m_last, next, last))
                last = m_last;

            if (last != null)
                last.next = next;
            else
                m_first = next;
        }

        public bool TryDequeue(out T value) {
            Node first;
            Node next = null;
            value = default(T);

            Thread.MemoryBarrier();
            do {
                first = m_first;
                if (first == null)
                    return false;
                next = first.next;
                if (next == null) {
                    // this is the last element,
                    // then try to update the tail
                    if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
                        // this is the race condition
                        if (m_last == null)
                            // the queue is empty
                            return false;
                        // tail has been changed, we need to restart
                        continue; 
                    }

                    // tail succesfully updated and first.next will never be changed
                    // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null
                    // however the parallel writer may update the m_first since the m_last is null

                    // so we need to fix inconsistency by setting m_first to null or if it has been
                    // updated by the writer already then we should just to give up
                    Interlocked.CompareExchange(ref m_first, null, first);
                    break;

                } else {
                    if (first == Interlocked.CompareExchange(ref m_first, next, first))
                        // head succesfully updated
                        break;
                }
            } while (true);

            value = first.value;
            return true;
        }
    }
}