view Implab/Parallels/AsyncQueue.cs @ 120:f1b897999260 v2

improved asyncpool usability working on batch operations on asyncqueue
author cin
date Mon, 12 Jan 2015 05:19:52 +0300
parents 2573b562e328
children 62d2f1e98c4e
line wrap: on
line source

using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;

namespace Implab.Parallels {
    public class AsyncQueue<T> : IEnumerable<T> {
        class Chunk {
            public Chunk next;

            int m_low;
            int m_hi;
            int m_alloc;
            readonly int m_size;
            readonly T[] m_data;

            public Chunk(int size) {
                m_size = size;
                m_data = new T[size];
            }

            public Chunk(int size, T value) {
                m_size = size;
                m_hi = 1;
                m_alloc = 1;
                m_data = new T[size];
                m_data[0] = value;
            }

            public int Low {
                get { return m_low; }
            }

            public int Hi {
                get { return m_hi; }
            }

            public bool TryEnqueue(T value,out bool extend) {
                var alloc = Interlocked.Increment(ref m_alloc) - 1;

                if (alloc >= m_size) {
                    extend = alloc == m_size;
                    return false;
                }

                extend = false;
                m_data[alloc] = value;

                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
                    // spin wait for commit
                }
                return true;
            }

            public bool TryDequeue(out T value,out bool recycle) {
                int low;
                do {
                    low = m_low;
                    if (low >= m_hi) {
                        value = default(T);
                        recycle = (low == m_size);
                        return false;
                    }
                } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));

                recycle = (low == m_size - 1);
                value = m_data[low];

                return true;
            }

            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
                int alloc;
                int allocSize;

                do {
                    alloc = m_alloc;

                    if (alloc > m_size) {
                        enqueued = 0;
                        extend = false;
                        return false;
                    }

                    allocSize = Math.Min(m_size - m_alloc, length);
                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));

                if (alloc == m_size) {
                    enqueued = 0;
                    extend = true;
                    return false;
                }

                Array.Copy(batch, offset, m_data, alloc, allocSize);
                enqueued = allocSize;
                extend = false;

                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
                    // spin wait for commit
                }
                return true;
            }

            public T GetAt(int pos) {
                return m_data[pos];
            }
        }

        public const int DEFAULT_CHUNK_SIZE = 32;

        readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;

        Chunk m_first;
        Chunk m_last;

        public AsyncQueue() {
            m_last = m_first = new Chunk(m_chunkSize);
        }

        public void Enqueue(T value) {
            var last = m_last;
            // spin wait to the new chunk
            bool extend = true;
            while(last == null || !last.TryEnqueue(value, out extend)) {
                // try to extend queue
                if (extend || last == null) {
                    var chunk = new Chunk(m_chunkSize, value);
                    if (EnqueueChunk(last, chunk))
                        break;
                    last = m_last;
                } else {
                    while (last != m_last) {
                        Thread.MemoryBarrier();
                        last = m_last;
                    }
                }
            }
        }

        public bool TryDequeue(out T value) {
            var chunk = m_first;
            bool recycle;
            while (chunk != null) {

                var result = chunk.TryDequeue(out value, out recycle);

                if (recycle) // this chunk is waste
                    RecycleFirstChunk(chunk);
                else
                    return result; // this chunk is usable and returned actual result

                if (result) // this chunk is waste but the true result is always actual
                    return true;

                // try again
                chunk = m_first;
            }

            // the queue is empty
            value = default(T);
            return false;
        }

        bool EnqueueChunk(Chunk last, Chunk chunk) {
            if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
                return false;

            if (last != null)
                last.next = chunk;
            else
                m_first = chunk;
            return true;
        }

        void RecycleFirstChunk(Chunk first) {
            var next = first.next;

            if (next == null) {
                // looks like this is the last chunk
                if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
                    // race
                    // maybe someone already recycled this chunk
                    // or a new chunk has been appedned to the queue

                    return; // give up
                }
                // the tail is updated
            }

            // we need to update the head
            Interlocked.CompareExchange(ref m_first, next, first);
            // if the head is already updated then give up
            return;

        }

        #region IEnumerable implementation

        class Enumerator : IEnumerator<T> {
            Chunk m_current;
            int m_pos = -1;

            public Enumerator(Chunk fisrt) {
                m_current = fisrt;
            }

            #region IEnumerator implementation

            public bool MoveNext() {
                if (m_current == null)
                    return false;

                if (m_pos == -1)
                    m_pos = m_current.Low;
                else
                    m_pos++;
                if (m_pos == m_current.Hi) {
                    m_pos = 0;
                    m_current = m_current.next;
                }

                return true;
            }

            public void Reset() {
                throw new NotSupportedException();
            }

            object IEnumerator.Current {
                get {
                    return Current;
                }
            }

            #endregion

            #region IDisposable implementation

            public void Dispose() {
            }

            #endregion

            #region IEnumerator implementation

            public T Current {
                get {
                    if (m_pos == -1 || m_current == null)
                        throw new InvalidOperationException();
                    return m_current.GetAt(m_pos);
                }
            }

            #endregion
        }

        public IEnumerator<T> GetEnumerator() {
            return new Enumerator(m_first);
        }

        #endregion

        #region IEnumerable implementation

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion
    }
}