diff Implab/Parallels/AsyncQueue.cs @ 119:2573b562e328 v2

Promises rewritten, added improved version of AsyncQueue
author cin
date Sun, 11 Jan 2015 19:13:02 +0300
parents
children f1b897999260
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/AsyncQueue.cs	Sun Jan 11 19:13:02 2015 +0300
@@ -0,0 +1,244 @@
+using System.Threading;
+using System.Collections.Generic;
+using System;
+using System.Collections;
+
+namespace Implab.Parallels {
+    public class AsyncQueue<T> : IEnumerable<T> {
+        class Chunk {
+            public Chunk next;
+
+            int m_low;
+            int m_hi;
+            int m_alloc;
+            readonly int m_size;
+            readonly T[] m_data;
+
+            public Chunk(int size) {
+                m_size = size;
+                m_data = new T[size];
+            }
+
+            public Chunk(int size, T value) {
+                m_size = size;
+                m_hi = 1;
+                m_alloc = 1;
+                m_data = new T[size];
+                m_data[0] = value;
+            }
+
+            public int Low {
+                get { return m_low; }
+            }
+
+            public int Hi {
+                get { return m_hi; }
+            }
+
+            public bool TryEnqueue(T value,out bool extend) {
+                extend = false;
+                int alloc;
+                do {
+                    alloc = m_alloc;
+                    if (alloc > m_size)
+                        return false;
+                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
+
+                if (alloc == m_size) {
+                    extend = true;
+                    return false;
+                }
+                    
+                m_data[alloc] = value;
+
+                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
+                    // spin wait for commit
+                }
+                return true;
+            }
+
+            public bool TryDequeue(out T value,out bool recycle) {
+                int low;
+                do {
+                    low = m_low;
+                    if (low >= m_hi) {
+                        value = default(T);
+                        recycle = (low == m_size);
+                        return false;
+                    }
+                } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
+
+                recycle = (low == m_size - 1);
+                value = m_data[low];
+
+                return true;
+            }
+
+            public T GetAt(int pos) {
+                return m_data[pos];
+            }
+        }
+
+        public const int DEFAULT_CHUNK_SIZE = 32;
+
+        readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
+
+        Chunk m_first;
+        Chunk m_last;
+
+        public AsyncQueue() {
+            m_last = m_first = new Chunk(m_chunkSize);
+        }
+
+        public void Enqueue(T value) {
+            var last = m_last;
+            // spin wait to the new chunk
+            bool extend = true;
+            while(last == null || !last.TryEnqueue(value, out extend)) {
+                // try to extend queue
+                if (extend || last == null) {
+                    var chunk = new Chunk(m_chunkSize, value);
+                    if (EnqueueChunk(last, chunk))
+                        break;
+                    last = m_last;
+                } else {
+                    while (last != m_last) {
+                        Thread.MemoryBarrier();
+                        last = m_last;
+                    }
+                }
+            }
+        }
+
+        public bool TryDequeue(out T value) {
+            var chunk = m_first;
+            bool recycle;
+            while (chunk != null) {
+
+                var result = chunk.TryDequeue(out value, out recycle);
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+                else
+                    return result; // this chunk is usable and returned actual result
+
+                if (result) // this chunk is waste but the true result is always actual
+                    return true;
+
+                // try again
+                chunk = m_first;
+            }
+
+            // the queue is empty
+            value = default(T);
+            return false;
+        }
+
+        bool EnqueueChunk(Chunk last, Chunk chunk) {
+            if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
+                return false;
+
+            if (last != null)
+                last.next = chunk;
+            else
+                m_first = chunk;
+            return true;
+        }
+
+        void RecycleFirstChunk(Chunk first) {
+            var next = first.next;
+
+            if (next == null) {
+                // looks like this is the last chunk
+                if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
+                    // race
+                    // maybe someone already recycled this chunk
+                    // or a new chunk has been appedned to the queue
+
+                    return; // give up
+                }
+                // the tail is updated
+            }
+
+            // we need to update the head
+            Interlocked.CompareExchange(ref m_first, next, first);
+            // if the head is already updated then give up
+            return;
+
+        }
+
+        #region IEnumerable implementation
+
+        class Enumerator : IEnumerator<T> {
+            Chunk m_current;
+            int m_pos = -1;
+
+            public Enumerator(Chunk fisrt) {
+                m_current = fisrt;
+            }
+
+            #region IEnumerator implementation
+
+            public bool MoveNext() {
+                if (m_current == null)
+                    return false;
+
+                if (m_pos == -1)
+                    m_pos = m_current.Low;
+                else
+                    m_pos++;
+                if (m_pos == m_current.Hi) {
+                    m_pos = 0;
+                    m_current = m_current.next;
+                }
+
+                return true;
+            }
+
+            public void Reset() {
+                throw new NotSupportedException();
+            }
+
+            object IEnumerator.Current {
+                get {
+                    return Current;
+                }
+            }
+
+            #endregion
+
+            #region IDisposable implementation
+
+            public void Dispose() {
+            }
+
+            #endregion
+
+            #region IEnumerator implementation
+
+            public T Current {
+                get {
+                    if (m_pos == -1 || m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current.GetAt(m_pos);
+                }
+            }
+
+            #endregion
+        }
+
+        public IEnumerator<T> GetEnumerator() {
+            return new Enumerator(m_first);
+        }
+
+        #endregion
+
+        #region IEnumerable implementation
+
+        IEnumerator IEnumerable.GetEnumerator() {
+            return GetEnumerator();
+        }
+
+        #endregion
+    }
+}