view Implab/Parallels/AsyncQueue.cs @ 187:dd4a3590f9c6 ref20160224

Reworked cancelation handling, if the cancel handler isn't specified the OperationCanceledException will be handled by the error handler Any unhandled OperationCanceledException will cause the promise cancelation
author cin
date Tue, 19 Apr 2016 17:35:20 +0300
parents 238e15580926
children 8d5de4eb9c2c
line wrap: on
line source

using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;
using System.Diagnostics;

namespace Implab.Parallels {
    public class AsyncQueue<T> : IEnumerable<T> {
        class Chunk {
            public Chunk next;

            int m_low;
            int m_hi;
            int m_alloc;
            readonly int m_size;
            readonly T[] m_data;

            public Chunk(int size) {
                m_size = size;
                m_data = new T[size];
            }

            public Chunk(int size, T value) {
                m_size = size;
                m_hi = 1;
                m_alloc = 1;
                m_data = new T[size];
                m_data[0] = value;
            }

            public Chunk(int size, T[] data, int offset, int length, int alloc) {
                m_size = size;
                m_hi = length;
                m_alloc = alloc;
                m_data = new T[size];
                Array.Copy(data, offset, m_data, 0, length);
            }

            public int Low {
                get { return m_low; }
            }

            public int Hi {
                get { return m_hi; }
            }

            public int Size {
                get { return m_size; }
            }

            public bool TryEnqueue(T value, out bool extend) {
                var alloc = Interlocked.Increment(ref m_alloc) - 1;

                if (alloc >= m_size) {
                    extend = alloc == m_size;
                    return false;
                }

                extend = false;
                m_data[alloc] = value;

                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
                    // spin wait for commit
                }
                return true;
            }

            /// <summary>
            /// Prevents from allocating new space in the chunk and waits for all write operations to complete
            /// </summary>
            public void Commit() {
                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);

                while (m_hi != actual)
                    Thread.MemoryBarrier();
            }

            public bool TryDequeue(out T value, out bool recycle) {
                int low;
                do {
                    low = m_low;
                    if (low >= m_hi) {
                        value = default(T);
                        recycle = (low == m_size);
                        return false;
                    }
                } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));

                recycle = (low == m_size - 1);
                value = m_data[low];

                return true;
            }

            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
                //int alloc;
                //int allocSize;

                var alloc = Interlocked.Add(ref m_alloc, length) - length;
                if (alloc > m_size) {
                    // the chunk is full and someone already
                    // creating the new one
                    enqueued = 0; // nothing was added
                    extend = false; // the caller shouldn't try to extend the queue
                    return false; // nothing was added
                }

                enqueued = Math.Min(m_size - alloc, length);
                extend = length > enqueued;

                if (enqueued == 0)
                    return false;


                Array.Copy(batch, offset, m_data, alloc, enqueued);

                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
                    // spin wait for commit
                }

                return true;
            }

            public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
                int low, hi, batchSize;

                do {
                    low = m_low;
                    hi = m_hi;
                    if (low >= hi) {
                        dequeued = 0;
                        recycle = (low == m_size); // recycling could be restarted and we need to signal again
                        return false;
                    }
                    batchSize = Math.Min(hi - low, length);
                } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));

                recycle = (low == m_size - batchSize);
                dequeued = batchSize;

                Array.Copy(m_data, low, buffer, offset, batchSize);

                return true;
            }

            public T GetAt(int pos) {
                return m_data[pos];
            }
        }

        public const int DEFAULT_CHUNK_SIZE = 32;
        public const int MAX_CHUNK_SIZE = 262144;

        Chunk m_first;
        Chunk m_last;

        /// <summary>
        /// Adds the specified value to the queue.
        /// </summary>
        /// <param name="value">Tha value which will be added to the queue.</param>
        public virtual void Enqueue(T value) {
            var last = m_last;
            // spin wait to the new chunk
            bool extend = true;
            while (last == null || !last.TryEnqueue(value, out extend)) {
                // try to extend queue
                if (extend || last == null) {
                    var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
                    if (EnqueueChunk(last, chunk))
                        break; // success! exit!
                    last = m_last;
                } else {
                    while (last == m_last) {
                        Thread.MemoryBarrier();
                    }
                    last = m_last;
                }
            }
        }

        /// <summary>
        /// Adds the specified data to the queue.
        /// </summary>
        /// <param name="data">The buffer which contains the data to be enqueued.</param>
        /// <param name="offset">The offset of the data in the buffer.</param>
        /// <param name="length">The size of the data to read from the buffer.</param>
        public virtual void EnqueueRange(T[] data, int offset, int length) {
            if (data == null)
                throw new ArgumentNullException("data");
            if (length == 0)
                return;
            if (offset < 0)
                throw new ArgumentOutOfRangeException("offset");
            if (length < 1 || offset + length > data.Length)
                throw new ArgumentOutOfRangeException("length");

            var last = m_last;

            bool extend;
            int enqueued;

            while (length > 0) {
                extend = true;
                if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
                    length -= enqueued;
                    offset += enqueued;
                }

                if (extend) {
                    // there was no enough space in the chunk
                    // or there was no chunks in the queue

                    while (length > 0) {

                        var size = Math.Min(length, MAX_CHUNK_SIZE);

                        var chunk = new Chunk(
                            Math.Max(size, DEFAULT_CHUNK_SIZE),
                            data,
                            offset,
                            size,
                            length // length >= size
                        );

                        if (!EnqueueChunk(last, chunk)) {
                            // looks like the queue has been updated then proceed from the beginning
                            last = m_last; 
                            break;
                        }

                        // we have successfully added the new chunk
                        last = chunk;
                        length -= size;
                        offset += size;
                    }
                } else {
                    // we don't need to extend the queue, if we successfully enqueued data
                    if (length == 0)
                        break;

                    // if we need to wait while someone is extending the queue
                    // spinwait
                    while (last == m_last) {
                        Thread.MemoryBarrier();
                    }

                    last = m_last;
                }
            }
        }

        /// <summary>
        /// Tries to retrieve the first element from the queue.
        /// </summary>
        /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
        /// <param name="value">The value of the dequeued element.</param>
        public bool TryDequeue(out T value) {
            var chunk = m_first;
            bool recycle;
            while (chunk != null) {

                var result = chunk.TryDequeue(out value, out recycle);

                if (recycle) // this chunk is waste
                    RecycleFirstChunk(chunk);
                else
                    return result; // this chunk is usable and returned actual result

                if (result) // this chunk is waste but the true result is always actual
                    return true;

                // try again
                chunk = m_first;
            }

            // the queue is empty
            value = default(T);
            return false;
        }

        /// <summary>
        /// Tries to dequeue the specified amount of data from the queue.
        /// </summary>
        /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
        /// <param name="buffer">The buffer to which the data will be written.</param>
        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
        /// <param name="length">The maximum amount of data to be retrieved.</param>
        /// <param name="dequeued">The actual amout of the retrieved data.</param>
        public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
            if (buffer == null)
                throw new ArgumentNullException("buffer");
            if (offset < 0)
                throw new ArgumentOutOfRangeException("offset");
            if (length < 1 || offset + length > buffer.Length)
                throw new ArgumentOutOfRangeException("length");

            var chunk = m_first;
            bool recycle;
            dequeued = 0;
            while (chunk != null) {

                int actual;
                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
                    offset += actual;
                    length -= actual;
                    dequeued += actual;
                }

                if (recycle) // this chunk is waste
                    RecycleFirstChunk(chunk);
                else if (actual == 0)
                    break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)

                if (length == 0)
                    return true;

                // we still may dequeue something
                // try again
                chunk = m_first;
            }

            return dequeued != 0;
        }

        /// <summary>
        /// Tries to dequeue all remaining data in the first chunk.
        /// </summary>
        /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
        /// <param name="buffer">The buffer to which the data will be written.</param>
        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
        /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
        /// <param name="dequeued">The actual amount of the dequeued data.</param>
        public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
            if (buffer == null)
                throw new ArgumentNullException("buffer");
            if (offset < 0)
                throw new ArgumentOutOfRangeException("offset");
            if (length < 1 || offset + length > buffer.Length)
                throw new ArgumentOutOfRangeException("length");

            var chunk = m_first;
            bool recycle;
            dequeued = 0;

            while (chunk != null) {

                int actual;
                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
                    dequeued = actual;
                }

                if (recycle) // this chunk is waste
                    RecycleFirstChunk(chunk);

                // if we have dequeued any data, then return
                if (dequeued != 0)
                    return true;

                // we still may dequeue something
                // try again
                chunk = m_first;
            }

            return false;
        }

        bool EnqueueChunk(Chunk last, Chunk chunk) {
            if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
                return false;

            if (last != null)
                last.next = chunk;
            else {
                m_first = chunk;
            }
            return true;
        }

        void RecycleFirstChunk(Chunk first) {
            var next = first.next;

            if (first != Interlocked.CompareExchange(ref m_first, next, first))
                return;

            if (next == null) {

                if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
                    /*while (first.next == null)
                        Thread.MemoryBarrier();*/

                    // race
                    // someone already updated the tail, restore the pointer to the queue head
                    m_first = first;
                }
                // the tail is updated
            }

            // we need to update the head
            //Interlocked.CompareExchange(ref m_first, next, first);
            // if the head is already updated then give up
            //return;

        }

        public void Clear() {
            // start the new queue
            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);

            do {
                Thread.MemoryBarrier();
                var first = m_first;
                var last = m_last;

                if (last == null) // nothing to clear
                    return;

                if (first == null || (first.next == null && first != last)) // inconcistency
                    continue;

                // here we will create inconsistency which will force others to spin
                // and prevent from fetching. chunk.next = null
                if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 
                    continue;// inconsistent

                m_last = chunk;

                return;

            } while(true);
        }

        public T[] Drain() {
            // start the new queue
            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);

            do {
                Thread.MemoryBarrier();
                var first = m_first;
                var last = m_last;

                if (last == null)
                    return new T[0];

                if (first == null || (first.next == null && first != last))
                    continue;

                // here we will create inconsistency which will force others to spin
                // and prevent from fetching. chunk.next = null
                if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
                    continue;// inconsistent

                last = Interlocked.Exchange(ref m_last, chunk);

                return ReadChunks(first, last);

            } while(true);
        }
            
        static T[] ReadChunks(Chunk chunk, object last) {
            var result = new List<T>();
            var buffer = new T[DEFAULT_CHUNK_SIZE];
            int actual;
            bool recycle;
            while (chunk != null) {
                // ensure all write operations on the chunk are complete
                chunk.Commit();

                // we need to read the chunk using this way
                // since some client still may completing the dequeue
                // operation, such clients most likely won't get results
                while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
                    result.AddRange(new ArraySegmentCollection(buffer, 0, actual));

                if (chunk == last) {
                    chunk = null;
                } else {
                    while (chunk.next == null)
                        Thread.MemoryBarrier();
                    chunk = chunk.next;
                }
            }

            return result.ToArray();
        }

        struct ArraySegmentCollection : ICollection<T> {
            readonly T[] m_data;
            readonly int m_offset;
            readonly int m_length;

            public ArraySegmentCollection(T[] data, int offset, int length) {
                m_data = data;
                m_offset = offset;
                m_length = length;
            }

            #region ICollection implementation

            public void Add(T item) {
                throw new NotSupportedException();
            }

            public void Clear() {
                throw new NotSupportedException();
            }

            public bool Contains(T item) {
                return false;
            }

            public void CopyTo(T[] array, int arrayIndex) {
                Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
            }

            public bool Remove(T item) {
                throw new NotSupportedException();
            }

            public int Count {
                get {
                    return m_length;
                }
            }

            public bool IsReadOnly {
                get {
                    return true;
                }
            }

            #endregion

            #region IEnumerable implementation

            public IEnumerator<T> GetEnumerator() {
                for (int i = m_offset; i < m_length + m_offset; i++)
                    yield return m_data[i];
            }

            #endregion

            #region IEnumerable implementation

            IEnumerator IEnumerable.GetEnumerator() {
                return GetEnumerator();
            }

            #endregion
        }

        #region IEnumerable implementation

        class Enumerator : IEnumerator<T> {
            Chunk m_current;
            int m_pos = -1;

            public Enumerator(Chunk fisrt) {
                m_current = fisrt;
            }

            #region IEnumerator implementation

            public bool MoveNext() {
                if (m_current == null)
                    return false;

                if (m_pos == -1)
                    m_pos = m_current.Low;
                else
                    m_pos++;

                if (m_pos == m_current.Hi) {

                    m_current = m_pos == m_current.Size ? m_current.next : null;

                    m_pos = 0;

                    if (m_current == null)
                        return false;
                }

                return true;
            }

            public void Reset() {
                throw new NotSupportedException();
            }

            object IEnumerator.Current {
                get {
                    return Current;
                }
            }

            #endregion

            #region IDisposable implementation

            public void Dispose() {
            }

            #endregion

            #region IEnumerator implementation

            public T Current {
                get {
                    if (m_pos == -1 || m_current == null)
                        throw new InvalidOperationException();
                    return m_current.GetAt(m_pos);
                }
            }

            #endregion
        }

        public IEnumerator<T> GetEnumerator() {
            return new Enumerator(m_first);
        }

        #endregion

        #region IEnumerable implementation

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion
    }
}