diff Implab/Parallels/AsyncQueue.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents 238e15580926
children 8d5de4eb9c2c
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/AsyncQueue.cs	Fri Apr 22 13:10:34 2016 +0300
@@ -0,0 +1,631 @@
+using System.Threading;
+using System.Collections.Generic;
+using System;
+using System.Collections;
+using System.Diagnostics;
+
+namespace Implab.Parallels {
+    public class AsyncQueue<T> : IEnumerable<T> {
+        class Chunk {
+            public Chunk next;
+
+            int m_low;
+            int m_hi;
+            int m_alloc;
+            readonly int m_size;
+            readonly T[] m_data;
+
+            public Chunk(int size) {
+                m_size = size;
+                m_data = new T[size];
+            }
+
+            public Chunk(int size, T value) {
+                m_size = size;
+                m_hi = 1;
+                m_alloc = 1;
+                m_data = new T[size];
+                m_data[0] = value;
+            }
+
+            public Chunk(int size, T[] data, int offset, int length, int alloc) {
+                m_size = size;
+                m_hi = length;
+                m_alloc = alloc;
+                m_data = new T[size];
+                Array.Copy(data, offset, m_data, 0, length);
+            }
+
+            public int Low {
+                get { return m_low; }
+            }
+
+            public int Hi {
+                get { return m_hi; }
+            }
+
+            public int Size {
+                get { return m_size; }
+            }
+
+            public bool TryEnqueue(T value, out bool extend) {
+                var alloc = Interlocked.Increment(ref m_alloc) - 1;
+
+                if (alloc >= m_size) {
+                    extend = alloc == m_size;
+                    return false;
+                }
+
+                extend = false;
+                m_data[alloc] = value;
+
+                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
+                    // spin wait for commit
+                }
+                return true;
+            }
+
+            /// <summary>
+            /// Prevents from allocating new space in the chunk and waits for all write operations to complete
+            /// </summary>
+            public void Commit() {
+                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
+
+                while (m_hi != actual)
+                    Thread.MemoryBarrier();
+            }
+
+            public bool TryDequeue(out T value, out bool recycle) {
+                int low;
+                do {
+                    low = m_low;
+                    if (low >= m_hi) {
+                        value = default(T);
+                        recycle = (low == m_size);
+                        return false;
+                    }
+                } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
+
+                recycle = (low == m_size - 1);
+                value = m_data[low];
+
+                return true;
+            }
+
+            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
+                //int alloc;
+                //int allocSize;
+
+                var alloc = Interlocked.Add(ref m_alloc, length) - length;
+                if (alloc > m_size) {
+                    // the chunk is full and someone already
+                    // creating the new one
+                    enqueued = 0; // nothing was added
+                    extend = false; // the caller shouldn't try to extend the queue
+                    return false; // nothing was added
+                }
+
+                enqueued = Math.Min(m_size - alloc, length);
+                extend = length > enqueued;
+
+                if (enqueued == 0)
+                    return false;
+
+
+                Array.Copy(batch, offset, m_data, alloc, enqueued);
+
+                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
+                    // spin wait for commit
+                }
+
+                return true;
+            }
+
+            public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
+                int low, hi, batchSize;
+
+                do {
+                    low = m_low;
+                    hi = m_hi;
+                    if (low >= hi) {
+                        dequeued = 0;
+                        recycle = (low == m_size); // recycling could be restarted and we need to signal again
+                        return false;
+                    }
+                    batchSize = Math.Min(hi - low, length);
+                } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
+
+                recycle = (low == m_size - batchSize);
+                dequeued = batchSize;
+
+                Array.Copy(m_data, low, buffer, offset, batchSize);
+
+                return true;
+            }
+
+            public T GetAt(int pos) {
+                return m_data[pos];
+            }
+        }
+
+        public const int DEFAULT_CHUNK_SIZE = 32;
+        public const int MAX_CHUNK_SIZE = 262144;
+
+        Chunk m_first;
+        Chunk m_last;
+
+        /// <summary>
+        /// Adds the specified value to the queue.
+        /// </summary>
+        /// <param name="value">Tha value which will be added to the queue.</param>
+        public virtual void Enqueue(T value) {
+            var last = m_last;
+            // spin wait to the new chunk
+            bool extend = true;
+            while (last == null || !last.TryEnqueue(value, out extend)) {
+                // try to extend queue
+                if (extend || last == null) {
+                    var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
+                    if (EnqueueChunk(last, chunk))
+                        break; // success! exit!
+                    last = m_last;
+                } else {
+                    while (last == m_last) {
+                        Thread.MemoryBarrier();
+                    }
+                    last = m_last;
+                }
+            }
+        }
+
+        /// <summary>
+        /// Adds the specified data to the queue.
+        /// </summary>
+        /// <param name="data">The buffer which contains the data to be enqueued.</param>
+        /// <param name="offset">The offset of the data in the buffer.</param>
+        /// <param name="length">The size of the data to read from the buffer.</param>
+        public virtual void EnqueueRange(T[] data, int offset, int length) {
+            if (data == null)
+                throw new ArgumentNullException("data");
+            if (length == 0)
+                return;
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > data.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var last = m_last;
+
+            bool extend;
+            int enqueued;
+
+            while (length > 0) {
+                extend = true;
+                if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
+                    length -= enqueued;
+                    offset += enqueued;
+                }
+
+                if (extend) {
+                    // there was no enough space in the chunk
+                    // or there was no chunks in the queue
+
+                    while (length > 0) {
+
+                        var size = Math.Min(length, MAX_CHUNK_SIZE);
+
+                        var chunk = new Chunk(
+                            Math.Max(size, DEFAULT_CHUNK_SIZE),
+                            data,
+                            offset,
+                            size,
+                            length // length >= size
+                        );
+
+                        if (!EnqueueChunk(last, chunk)) {
+                            // looks like the queue has been updated then proceed from the beginning
+                            last = m_last; 
+                            break;
+                        }
+
+                        // we have successfully added the new chunk
+                        last = chunk;
+                        length -= size;
+                        offset += size;
+                    }
+                } else {
+                    // we don't need to extend the queue, if we successfully enqueued data
+                    if (length == 0)
+                        break;
+
+                    // if we need to wait while someone is extending the queue
+                    // spinwait
+                    while (last == m_last) {
+                        Thread.MemoryBarrier();
+                    }
+
+                    last = m_last;
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tries to retrieve the first element from the queue.
+        /// </summary>
+        /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
+        /// <param name="value">The value of the dequeued element.</param>
+        public bool TryDequeue(out T value) {
+            var chunk = m_first;
+            bool recycle;
+            while (chunk != null) {
+
+                var result = chunk.TryDequeue(out value, out recycle);
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+                else
+                    return result; // this chunk is usable and returned actual result
+
+                if (result) // this chunk is waste but the true result is always actual
+                    return true;
+
+                // try again
+                chunk = m_first;
+            }
+
+            // the queue is empty
+            value = default(T);
+            return false;
+        }
+
+        /// <summary>
+        /// Tries to dequeue the specified amount of data from the queue.
+        /// </summary>
+        /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
+        /// <param name="buffer">The buffer to which the data will be written.</param>
+        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
+        /// <param name="length">The maximum amount of data to be retrieved.</param>
+        /// <param name="dequeued">The actual amout of the retrieved data.</param>
+        public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
+            if (buffer == null)
+                throw new ArgumentNullException("buffer");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > buffer.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var chunk = m_first;
+            bool recycle;
+            dequeued = 0;
+            while (chunk != null) {
+
+                int actual;
+                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
+                    offset += actual;
+                    length -= actual;
+                    dequeued += actual;
+                }
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+                else if (actual == 0)
+                    break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
+
+                if (length == 0)
+                    return true;
+
+                // we still may dequeue something
+                // try again
+                chunk = m_first;
+            }
+
+            return dequeued != 0;
+        }
+
+        /// <summary>
+        /// Tries to dequeue all remaining data in the first chunk.
+        /// </summary>
+        /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
+        /// <param name="buffer">The buffer to which the data will be written.</param>
+        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
+        /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
+        /// <param name="dequeued">The actual amount of the dequeued data.</param>
+        public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
+            if (buffer == null)
+                throw new ArgumentNullException("buffer");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > buffer.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var chunk = m_first;
+            bool recycle;
+            dequeued = 0;
+
+            while (chunk != null) {
+
+                int actual;
+                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
+                    dequeued = actual;
+                }
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+
+                // if we have dequeued any data, then return
+                if (dequeued != 0)
+                    return true;
+
+                // we still may dequeue something
+                // try again
+                chunk = m_first;
+            }
+
+            return false;
+        }
+
+        bool EnqueueChunk(Chunk last, Chunk chunk) {
+            if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
+                return false;
+
+            if (last != null)
+                last.next = chunk;
+            else {
+                m_first = chunk;
+            }
+            return true;
+        }
+
+        void RecycleFirstChunk(Chunk first) {
+            var next = first.next;
+
+            if (first != Interlocked.CompareExchange(ref m_first, next, first))
+                return;
+
+            if (next == null) {
+
+                if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
+                    /*while (first.next == null)
+                        Thread.MemoryBarrier();*/
+
+                    // race
+                    // someone already updated the tail, restore the pointer to the queue head
+                    m_first = first;
+                }
+                // the tail is updated
+            }
+
+            // we need to update the head
+            //Interlocked.CompareExchange(ref m_first, next, first);
+            // if the head is already updated then give up
+            //return;
+
+        }
+
+        public void Clear() {
+            // start the new queue
+            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
+
+            do {
+                Thread.MemoryBarrier();
+                var first = m_first;
+                var last = m_last;
+
+                if (last == null) // nothing to clear
+                    return;
+
+                if (first == null || (first.next == null && first != last)) // inconcistency
+                    continue;
+
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 
+                    continue;// inconsistent
+
+                m_last = chunk;
+
+                return;
+
+            } while(true);
+        }
+
+        public T[] Drain() {
+            // start the new queue
+            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
+
+            do {
+                Thread.MemoryBarrier();
+                var first = m_first;
+                var last = m_last;
+
+                if (last == null)
+                    return new T[0];
+
+                if (first == null || (first.next == null && first != last))
+                    continue;
+
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
+                    continue;// inconsistent
+
+                last = Interlocked.Exchange(ref m_last, chunk);
+
+                return ReadChunks(first, last);
+
+            } while(true);
+        }
+            
+        static T[] ReadChunks(Chunk chunk, object last) {
+            var result = new List<T>();
+            var buffer = new T[DEFAULT_CHUNK_SIZE];
+            int actual;
+            bool recycle;
+            while (chunk != null) {
+                // ensure all write operations on the chunk are complete
+                chunk.Commit();
+
+                // we need to read the chunk using this way
+                // since some client still may completing the dequeue
+                // operation, such clients most likely won't get results
+                while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
+                    result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
+
+                if (chunk == last) {
+                    chunk = null;
+                } else {
+                    while (chunk.next == null)
+                        Thread.MemoryBarrier();
+                    chunk = chunk.next;
+                }
+            }
+
+            return result.ToArray();
+        }
+
+        struct ArraySegmentCollection : ICollection<T> {
+            readonly T[] m_data;
+            readonly int m_offset;
+            readonly int m_length;
+
+            public ArraySegmentCollection(T[] data, int offset, int length) {
+                m_data = data;
+                m_offset = offset;
+                m_length = length;
+            }
+
+            #region ICollection implementation
+
+            public void Add(T item) {
+                throw new NotSupportedException();
+            }
+
+            public void Clear() {
+                throw new NotSupportedException();
+            }
+
+            public bool Contains(T item) {
+                return false;
+            }
+
+            public void CopyTo(T[] array, int arrayIndex) {
+                Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
+            }
+
+            public bool Remove(T item) {
+                throw new NotSupportedException();
+            }
+
+            public int Count {
+                get {
+                    return m_length;
+                }
+            }
+
+            public bool IsReadOnly {
+                get {
+                    return true;
+                }
+            }
+
+            #endregion
+
+            #region IEnumerable implementation
+
+            public IEnumerator<T> GetEnumerator() {
+                for (int i = m_offset; i < m_length + m_offset; i++)
+                    yield return m_data[i];
+            }
+
+            #endregion
+
+            #region IEnumerable implementation
+
+            IEnumerator IEnumerable.GetEnumerator() {
+                return GetEnumerator();
+            }
+
+            #endregion
+        }
+
+        #region IEnumerable implementation
+
+        class Enumerator : IEnumerator<T> {
+            Chunk m_current;
+            int m_pos = -1;
+
+            public Enumerator(Chunk fisrt) {
+                m_current = fisrt;
+            }
+
+            #region IEnumerator implementation
+
+            public bool MoveNext() {
+                if (m_current == null)
+                    return false;
+
+                if (m_pos == -1)
+                    m_pos = m_current.Low;
+                else
+                    m_pos++;
+
+                if (m_pos == m_current.Hi) {
+
+                    m_current = m_pos == m_current.Size ? m_current.next : null;
+
+                    m_pos = 0;
+
+                    if (m_current == null)
+                        return false;
+                }
+
+                return true;
+            }
+
+            public void Reset() {
+                throw new NotSupportedException();
+            }
+
+            object IEnumerator.Current {
+                get {
+                    return Current;
+                }
+            }
+
+            #endregion
+
+            #region IDisposable implementation
+
+            public void Dispose() {
+            }
+
+            #endregion
+
+            #region IEnumerator implementation
+
+            public T Current {
+                get {
+                    if (m_pos == -1 || m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current.GetAt(m_pos);
+                }
+            }
+
+            #endregion
+        }
+
+        public IEnumerator<T> GetEnumerator() {
+            return new Enumerator(m_first);
+        }
+
+        #endregion
+
+        #region IEnumerable implementation
+
+        IEnumerator IEnumerable.GetEnumerator() {
+            return GetEnumerator();
+        }
+
+        #endregion
+    }
+}