view Implab/Parallels/AsyncQueue.cs @ 124:a336cb13c6a9 v2

major update, added Drain mathod to AsyncQueue class
author cin
date Thu, 15 Jan 2015 02:43:14 +0300
parents f4d6ea6969cc
children f803565868a4
line wrap: on
line source

using System.Threading;
using System.Collections.Generic;
using System;
using System.Collections;
using System.Diagnostics;

namespace Implab.Parallels {
    public class AsyncQueue<T> : IEnumerable<T> {
        class Chunk {
            public Chunk next;

            int m_low;
            int m_hi;
            int m_alloc;
            readonly int m_size;
            readonly T[] m_data;

            public Chunk(int size) {
                m_size = size;
                m_data = new T[size];
            }

            public Chunk(int size, T value) {
                m_size = size;
                m_hi = 1;
                m_alloc = 1;
                m_data = new T[size];
                m_data[0] = value;
            }

            public Chunk(int size, T[] data, int offset, int length, int alloc) {
                m_size = size;
                m_hi = length;
                m_alloc = alloc;
                m_data = new T[size];
                Array.Copy(data, offset, m_data, 0, length);
            }

            public int Low {
                get { return m_low; }
            }

            public int Hi {
                get { return m_hi; }
            }

            public bool TryEnqueue(T value, out bool extend) {
                var alloc = Interlocked.Increment(ref m_alloc) - 1;

                if (alloc >= m_size) {
                    extend = alloc == m_size;
                    return false;
                }

                extend = false;
                m_data[alloc] = value;

                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
                    // spin wait for commit
                }
                return true;
            }

            /// <summary>
            /// Prevents from allocating new space in the chunk and waits for all write operations to complete
            /// </summary>
            public void Commit() {
                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);

                while (m_hi != actual)
                    Thread.MemoryBarrier();
            }

            public bool TryDequeue(out T value, out bool recycle) {
                int low;
                do {
                    low = m_low;
                    if (low >= m_hi) {
                        value = default(T);
                        recycle = (low == m_size);
                        return false;
                    }
                } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));

                recycle = (low == m_size - 1);
                value = m_data[low];

                return true;
            }

            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
                //int alloc;
                //int allocSize;

                var alloc = Interlocked.Add(ref m_alloc, length) - length;
                if (alloc > m_size) {
                    // the chunk is full and someone already
                    // creating the new one
                    enqueued = 0; // nothing was added
                    extend = false; // the caller shouldn't try to extend the queue
                    return false; // nothing was added
                }

                enqueued = Math.Min(m_size - alloc, length);
                extend = length > enqueued;

                if (enqueued == 0)
                    return false;


                Array.Copy(batch, offset, m_data, alloc, enqueued);

                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
                    // spin wait for commit
                }

                return true;
            }

            public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
                int low, hi, batchSize;

                do {
                    low = m_low;
                    hi = m_hi;
                    if (low >= hi) {
                        dequeued = 0;
                        recycle = (low == m_size); // recycling could be restarted and we need to signal again
                        return false;
                    }
                    batchSize = Math.Min(hi - low, length);
                } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));

                recycle = (low == m_size - batchSize);
                dequeued = batchSize;

                Array.Copy(m_data, low, buffer, offset, batchSize);

                return true;
            }

            public T GetAt(int pos) {
                return m_data[pos];
            }
        }

        public const int DEFAULT_CHUNK_SIZE = 32;
        public const int MAX_CHUNK_SIZE = 262144;

        readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;

        Chunk m_first;
        Chunk m_last;

        public AsyncQueue() {
            m_last = m_first = new Chunk(m_chunkSize);
        }

        /// <summary>
        /// Adds the specified value to the queue.
        /// </summary>
        /// <param name="value">Tha value which will be added to the queue.</param>
        public void Enqueue(T value) {
            var last = m_last;
            // spin wait to the new chunk
            bool extend = true;
            while (last == null || !last.TryEnqueue(value, out extend)) {
                // try to extend queue
                if (extend || last == null) {
                    var chunk = new Chunk(m_chunkSize, value);
                    if (EnqueueChunk(last, chunk))
                        break; // success! exit!
                    last = m_last;
                } else {
                    while (last == m_last) {
                        Thread.MemoryBarrier();
                    }
                    last = m_last;
                }
            }
        }

        /// <summary>
        /// Adds the specified data to the queue.
        /// </summary>
        /// <param name="data">The buffer which contains the data to be enqueued.</param>
        /// <param name="offset">The offset of the data in the buffer.</param>
        /// <param name="length">The size of the data to read from the buffer.</param>
        public void EnqueueRange(T[] data, int offset, int length) {
            if (data == null)
                throw new ArgumentNullException("data");
            if (offset < 0)
                throw new ArgumentOutOfRangeException("offset");
            if (length < 1 || offset + length > data.Length)
                throw new ArgumentOutOfRangeException("length");

            var last = m_last;

            bool extend;
            int enqueued;

            while (length > 0) {
                extend = true;
                if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
                    length -= enqueued;
                    offset += enqueued;
                }

                if (extend) {
                    // there was no enough space in the chunk
                    // or there was no chunks in the queue

                    while (length > 0) {

                        var size = Math.Min(length, MAX_CHUNK_SIZE);

                        var chunk = new Chunk(
                            Math.Max(size, m_chunkSize),
                            data,
                            offset,
                            size,
                            length // length >= size
                        );

                        if (!EnqueueChunk(last, chunk)) {
                            // looks like the queue has been updated then proceed from the beginning
                            last = m_last; 
                            break;
                        }

                        // we have successfully added the new chunk
                        last = chunk;
                        length -= size;
                        offset += size;
                    }
                } else {
                    // we don't need to extend the queue, if we successfully enqueued data
                    if (length == 0)
                        break;

                    // if we need to wait while someone is extending the queue
                    // spinwait
                    while (last == m_last) {
                        Thread.MemoryBarrier();
                    }

                    last = m_last;
                }
            }
        }

        /// <summary>
        /// Tries to retrieve the first element from the queue.
        /// </summary>
        /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
        /// <param name="value">The value of the dequeued element.</param>
        public bool TryDequeue(out T value) {
            var chunk = m_first;
            bool recycle;
            while (chunk != null) {

                var result = chunk.TryDequeue(out value, out recycle);

                if (recycle) // this chunk is waste
                    RecycleFirstChunk(chunk);
                else
                    return result; // this chunk is usable and returned actual result

                if (result) // this chunk is waste but the true result is always actual
                    return true;

                // try again
                chunk = m_first;
            }

            // the queue is empty
            value = default(T);
            return false;
        }

        /// <summary>
        /// Tries to dequeue the specified amount of data from the queue.
        /// </summary>
        /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
        /// <param name="buffer">The buffer to which the data will be written.</param>
        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
        /// <param name="length">The maximum amount of data to be retrieved.</param>
        /// <param name="dequeued">The actual amout of the retrieved data.</param>
        public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
            if (buffer == null)
                throw new ArgumentNullException("buffer");
            if (offset < 0)
                throw new ArgumentOutOfRangeException("offset");
            if (length < 1 || offset + length > buffer.Length)
                throw new ArgumentOutOfRangeException("length");

            var chunk = m_first;
            bool recycle;
            dequeued = 0;
            while (chunk != null) {

                int actual;
                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
                    offset += actual;
                    length -= actual;
                    dequeued += actual;
                }

                if (recycle) // this chunk is waste
                    RecycleFirstChunk(chunk);
                else if (actual == 0)
                    break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)

                if (length == 0)
                    return true;

                // we still may dequeue something
                // try again
                chunk = m_first;
            }

            return dequeued != 0;
        }

        /// <summary>
        /// Tries to dequeue all remaining data in the first chunk.
        /// </summary>
        /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
        /// <param name="buffer">The buffer to which the data will be written.</param>
        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
        /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
        /// <param name="dequeued">The actual amount of the dequeued data.</param>
        public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
            if (buffer == null)
                throw new ArgumentNullException("buffer");
            if (offset < 0)
                throw new ArgumentOutOfRangeException("offset");
            if (length < 1 || offset + length > buffer.Length)
                throw new ArgumentOutOfRangeException("length");

            var chunk = m_first;
            bool recycle;
            dequeued = 0;

            while (chunk != null) {

                int actual;
                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
                    dequeued = actual;
                }

                if (recycle) // this chunk is waste
                    RecycleFirstChunk(chunk);

                // if we have dequeued any data, then return
                if (dequeued != 0)
                    return true;

                // we still may dequeue something
                // try again
                chunk = m_first;
            }

            return false;
        }

        bool EnqueueChunk(Chunk last, Chunk chunk) {
            if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
                return false;

            if (last != null)
                last.next = chunk;
            else {
                m_first = chunk;
            }
            return true;
        }

        void RecycleFirstChunk(Chunk first) {
            var next = first.next;

            if (first != Interlocked.CompareExchange(ref m_first, next, first))
                return;

            if (next == null) {

                if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
                    /*while (first.next == null)
                        Thread.MemoryBarrier();*/

                    // race
                    // someone already updated the tail, restore the pointer to the queue head
                    m_first = first;
                }
                // the tail is updated
            }

            // we need to update the head
            //Interlocked.CompareExchange(ref m_first, next, first);
            // if the head is already updated then give up
            //return;

        }

        public void Clear() {
            // start the new queue
            var chunk = new Chunk(m_chunkSize);

            do {
                Thread.MemoryBarrier();
                var first = m_first;
                var last = m_last;

                if (last == null) // nothing to clear
                    return;

                if (first == null || (first.next == null && first != last)) // inconcistency
                    continue;

                // here we will create inconsistency which will force others to spin
                // and prevent from fetching. chunk.next = null
                if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 
                    continue;// inconsistent

                m_last = chunk;

                return;

            } while(true);
        }

        public T[] Drain() {
            // start the new queue
            var chunk = new Chunk(m_chunkSize);

            do {
                Thread.MemoryBarrier();
                var first = m_first;
                var last = m_last;

                if (last == null)
                    return new T[0];

                if (first == null || (first.next == null && first != last))
                    continue;

                // here we will create inconsistency which will force others to spin
                // and prevent from fetching. chunk.next = null
                if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
                    continue;// inconsistent

                last = Interlocked.Exchange(ref m_last, chunk);

                return ReadChunks(first, last);

            } while(true);
        }
            
        T[] ReadChunks(Chunk chunk, object last) {
            var result = new List<T>();
            var buffer = new T[m_chunkSize];
            int actual;
            bool recycle;
            while (chunk != null) {
                // ensure all write operations on the chunk are complete
                chunk.Commit();

                // we need to read the chunk using this way
                // since some client still may completing the dequeue
                // operation, such clients most likely won't get results
                while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
                    result.AddRange(new ArraySegmentCollection(buffer, 0, actual));

                if (chunk == last) {
                    chunk = null;
                } else {
                    while (chunk.next == null)
                        Thread.MemoryBarrier();
                    chunk = chunk.next;
                }
            }

            return result.ToArray();
        }

        struct ArraySegmentCollection : ICollection<T> {
            readonly T[] m_data;
            readonly int m_offset;
            readonly int m_length;

            public ArraySegmentCollection(T[] data, int offset, int length) {
                m_data = data;
                m_offset = offset;
                m_length = length;
            }

            #region ICollection implementation

            public void Add(T item) {
                throw new InvalidOperationException();
            }

            public void Clear() {
                throw new InvalidOperationException();
            }

            public bool Contains(T item) {
                return false;
            }

            public void CopyTo(T[] array, int arrayIndex) {
                Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
            }

            public bool Remove(T item) {
                throw new NotImplementedException();
            }

            public int Count {
                get {
                    return m_length;
                }
            }

            public bool IsReadOnly {
                get {
                    return true;
                }
            }

            #endregion

            #region IEnumerable implementation

            public IEnumerator<T> GetEnumerator() {
                for (int i = m_offset; i < m_length + m_offset; i++)
                    yield return m_data[i];
            }

            #endregion

            #region IEnumerable implementation

            IEnumerator IEnumerable.GetEnumerator() {
                return GetEnumerator();
            }

            #endregion
        }

        #region IEnumerable implementation

        class Enumerator : IEnumerator<T> {
            Chunk m_current;
            int m_pos = -1;

            public Enumerator(Chunk fisrt) {
                m_current = fisrt;
            }

            #region IEnumerator implementation

            public bool MoveNext() {
                if (m_current == null)
                    return false;

                if (m_pos == -1)
                    m_pos = m_current.Low;
                else
                    m_pos++;
                if (m_pos == m_current.Hi) {
                    m_pos = 0;
                    m_current = m_current.next;
                }

                return true;
            }

            public void Reset() {
                throw new NotSupportedException();
            }

            object IEnumerator.Current {
                get {
                    return Current;
                }
            }

            #endregion

            #region IDisposable implementation

            public void Dispose() {
            }

            #endregion

            #region IEnumerator implementation

            public T Current {
                get {
                    if (m_pos == -1 || m_current == null)
                        throw new InvalidOperationException();
                    return m_current.GetAt(m_pos);
                }
            }

            #endregion
        }

        public IEnumerator<T> GetEnumerator() {
            return new Enumerator(m_first);
        }

        #endregion

        #region IEnumerable implementation

        IEnumerator IEnumerable.GetEnumerator() {
            return GetEnumerator();
        }

        #endregion
    }
}