diff Implab/Parallels/AsyncQueue.cs @ 233:d6fe09f5592c v2

Improved AsyncQueue Removed ImplabFx
author cin
date Wed, 04 Oct 2017 15:44:47 +0300
parents 8d5de4eb9c2c
children 8dd666e6b6bf
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -7,11 +7,11 @@
 namespace Implab.Parallels {
     public class AsyncQueue<T> : IEnumerable<T> {
         class Chunk {
-            public Chunk next;
+            public volatile Chunk next;
 
-            int m_low;
-            int m_hi;
-            int m_alloc;
+            volatile int m_low;
+            volatile int m_hi;
+            volatile int m_alloc;
             readonly int m_size;
             readonly T[] m_data;
 
@@ -28,12 +28,15 @@
                 m_data[0] = value;
             }
 
-            public Chunk(int size, T[] data, int offset, int length, int alloc) {
+            public Chunk(int size, int allocated) {
                 m_size = size;
-                m_hi = length;
-                m_alloc = alloc;
+                m_hi = allocated;
+                m_alloc = allocated;
                 m_data = new T[size];
-                Array.Copy(data, offset, m_data, 0, length);
+            }
+
+            public void WriteData(T[] data, int offset, int dest, int length) {
+                Array.Copy(data, offset, m_data, dest, length);
             }
 
             public int Low {
@@ -48,31 +51,36 @@
                 get { return m_size; }
             }
 
-            public bool TryEnqueue(T value, out bool extend) {
-                var alloc = Interlocked.Increment(ref m_alloc) - 1;
-
-                if (alloc >= m_size) {
-                    extend = alloc == m_size;
-                    return false;
-                }
-
-                extend = false;
+            public bool TryEnqueue(T value) {
+                int alloc;
+                do {
+                    alloc = m_alloc;
+                    if (alloc >= m_size)
+                        return false;
+                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
+                
                 m_data[alloc] = value;
 
-                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
+                SpinWait spin = new SpinWait();
+                // m_hi is volatile
+                while (alloc != m_hi) {
                     // spin wait for commit
+                    spin.SpinOnce();
                 }
+                m_hi = alloc + 1;
+
                 return true;
             }
 
             /// <summary>
             /// Prevents from allocating new space in the chunk and waits for all write operations to complete
             /// </summary>
-            public void Commit() {
-                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
-
-                while (m_hi != actual)
-                    Thread.MemoryBarrier();
+            public void Seal() {
+                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
+                SpinWait spin = new SpinWait();
+                while (m_hi != actual) {
+                    spin.SpinOnce();
+                }
             }
 
             public bool TryDequeue(out T value, out bool recycle) {
@@ -84,44 +92,38 @@
                         recycle = (low == m_size);
                         return false;
                     }
-                } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
+                } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
 
-                recycle = (low == m_size - 1);
+                recycle = (low + 1 == m_size);
                 value = m_data[low];
 
                 return true;
             }
 
-            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
-                //int alloc;
-                //int allocSize;
+            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
+                int alloc;
+                do {
+                    alloc = m_alloc;
+                    if (alloc >= m_size) {
+                        enqueued = 0;
+                        return false;
+                    } else {
+                        enqueued = Math.Min(length, m_size - alloc);
+                    }
+                } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
+                
+                Array.Copy(batch, offset, m_data, alloc, enqueued);
 
-                var alloc = Interlocked.Add(ref m_alloc, length) - length;
-                if (alloc > m_size) {
-                    // the chunk is full and someone already
-                    // creating the new one
-                    enqueued = 0; // nothing was added
-                    extend = false; // the caller shouldn't try to extend the queue
-                    return false; // nothing was added
+                SpinWait spin = new SpinWait();
+                while (alloc != m_hi) {
+                    spin.SpinOnce();
                 }
 
-                enqueued = Math.Min(m_size - alloc, length);
-                extend = length > enqueued;
-
-                if (enqueued == 0)
-                    return false;
-
-
-                Array.Copy(batch, offset, m_data, alloc, enqueued);
-
-                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
-                    // spin wait for commit
-                }
-
+                m_hi = alloc + enqueued;
                 return true;
             }
 
-            public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
+            public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
                 int low, hi, batchSize;
 
                 do {
@@ -129,15 +131,14 @@
                     hi = m_hi;
                     if (low >= hi) {
                         dequeued = 0;
-                        recycle = (low == m_size); // recycling could be restarted and we need to signal again
+                        recycle = (low == m_size);
                         return false;
                     }
                     batchSize = Math.Min(hi - low, length);
-                } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
+                } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
 
-                recycle = (low == m_size - batchSize);
                 dequeued = batchSize;
-
+                recycle = (low + batchSize == m_size);
                 Array.Copy(m_data, low, buffer, offset, batchSize);
 
                 return true;
@@ -149,32 +150,33 @@
         }
 
         public const int DEFAULT_CHUNK_SIZE = 32;
-        public const int MAX_CHUNK_SIZE = 262144;
+        public const int MAX_CHUNK_SIZE = 256;
 
         Chunk m_first;
         Chunk m_last;
 
+        public AsyncQueue() {
+            m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
+        }
+
         /// <summary>
         /// Adds the specified value to the queue.
         /// </summary>
         /// <param name="value">Tha value which will be added to the queue.</param>
-        public virtual void Enqueue(T value) {
+        public void Enqueue(T value) {
             var last = m_last;
-            // spin wait to the new chunk
-            bool extend = true;
-            while (last == null || !last.TryEnqueue(value, out extend)) {
+            SpinWait spin = new SpinWait();
+            while (!last.TryEnqueue(value)) {
                 // try to extend queue
-                if (extend || last == null) {
-                    var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
-                    if (EnqueueChunk(last, chunk))
-                        break; // success! exit!
-                    last = m_last;
+                var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
+                var t = Interlocked.CompareExchange(ref m_last, chunk, last);
+                if (t == last) {
+                    last.next = chunk;
+                    break;
                 } else {
-                    while (last == m_last) {
-                        Thread.MemoryBarrier();
-                    }
-                    last = m_last;
+                    last = t;
                 }
+                spin.SpinOnce();
             }
         }
 
@@ -184,67 +186,54 @@
         /// <param name="data">The buffer which contains the data to be enqueued.</param>
         /// <param name="offset">The offset of the data in the buffer.</param>
         /// <param name="length">The size of the data to read from the buffer.</param>
-        public virtual void EnqueueRange(T[] data, int offset, int length) {
+        public void EnqueueRange(T[] data, int offset, int length) {
             if (data == null)
                 throw new ArgumentNullException("data");
-            if (length == 0)
-                return;
             if (offset < 0)
                 throw new ArgumentOutOfRangeException("offset");
             if (length < 1 || offset + length > data.Length)
                 throw new ArgumentOutOfRangeException("length");
 
-            var last = m_last;
+            while (length > 0) {
+                var last = m_last;
+                int enqueued;
 
-            bool extend;
-            int enqueued;
-
-            while (length > 0) {
-                extend = true;
-                if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
+                if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
                     length -= enqueued;
                     offset += enqueued;
                 }
 
-                if (extend) {
-                    // there was no enough space in the chunk
-                    // or there was no chunks in the queue
+                if (length > 0) {
+                    // we have something to enqueue
 
-                    while (length > 0) {
-
-                        var size = Math.Min(length, MAX_CHUNK_SIZE);
+                    var tail = length % MAX_CHUNK_SIZE;
 
-                        var chunk = new Chunk(
-                            Math.Max(size, DEFAULT_CHUNK_SIZE),
-                            data,
-                            offset,
-                            size,
-                            length // length >= size
-                        );
+                    var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
+
+                    if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
+                        continue; // we wasn't able to catch the writer, roundtrip
 
-                        if (!EnqueueChunk(last, chunk)) {
-                            // looks like the queue has been updated then proceed from the beginning
-                            last = m_last; 
-                            break;
-                        }
+                    // we are lucky
+                    // we can exclusively write our batch, the other writers will continue their work
+
+                    length -= tail;
 
-                        // we have successfully added the new chunk
-                        last = chunk;
-                        length -= size;
-                        offset += size;
-                    }
-                } else {
-                    // we don't need to extend the queue, if we successfully enqueued data
-                    if (length == 0)
-                        break;
-
-                    // if we need to wait while someone is extending the queue
-                    // spinwait
-                    while (last == m_last) {
-                        Thread.MemoryBarrier();
+                    
+                    for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
+                        var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
+                        node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
+                        offset += MAX_CHUNK_SIZE;
+                        // fence last.next is volatile
+                        last.next = node;
+                        last = node;
                     }
 
-                    last = m_last;
+                    if (tail > 0)
+                        chunk.WriteData(data, offset, 0, tail);
+                    
+                    // fence last.next is volatile
+                    last.next = chunk;
+                    return;
                 }
             }
         }
@@ -256,26 +245,21 @@
         /// <param name="value">The value of the dequeued element.</param>
         public bool TryDequeue(out T value) {
             var chunk = m_first;
-            bool recycle;
-            while (chunk != null) {
+            do {
+                bool recycle;
 
                 var result = chunk.TryDequeue(out value, out recycle);
 
-                if (recycle) // this chunk is waste
-                    RecycleFirstChunk(chunk);
-                else
+                if (recycle && chunk.next != null) {
+                    // this chunk is waste
+                    chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+                } else {
                     return result; // this chunk is usable and returned actual result
+                }
 
                 if (result) // this chunk is waste but the true result is always actual
                     return true;
-
-                // try again
-                chunk = m_first;
-            }
-
-            // the queue is empty
-            value = default(T);
-            return false;
+            } while (true);
         }
 
         /// <summary>
@@ -295,10 +279,9 @@
                 throw new ArgumentOutOfRangeException("length");
 
             var chunk = m_first;
-            bool recycle;
             dequeued = 0;
-            while (chunk != null) {
-
+            do {
+                bool recycle;
                 int actual;
                 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
                     offset += actual;
@@ -306,18 +289,16 @@
                     dequeued += actual;
                 }
 
-                if (recycle) // this chunk is waste
-                    RecycleFirstChunk(chunk);
-                else if (actual == 0)
-                    break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
+                if (recycle && chunk.next != null) {
+                    // this chunk is waste
+                    chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+                } else {
+                    chunk = null;
+                }
 
                 if (length == 0)
                     return true;
-
-                // we still may dequeue something
-                // try again
-                chunk = m_first;
-            }
+            } while (chunk != null);
 
             return dequeued != 0;
         }
@@ -339,123 +320,81 @@
                 throw new ArgumentOutOfRangeException("length");
 
             var chunk = m_first;
-            bool recycle;
-            dequeued = 0;
-
-            while (chunk != null) {
+            do {
+                bool recycle;
+                chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
 
-                int actual;
-                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
-                    dequeued = actual;
+                if (recycle && chunk.next != null) {
+                    // this chunk is waste
+                    chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+                } else {
+                    chunk = null;
                 }
 
-                if (recycle) // this chunk is waste
-                    RecycleFirstChunk(chunk);
-
                 // if we have dequeued any data, then return
                 if (dequeued != 0)
                     return true;
 
-                // we still may dequeue something
-                // try again
-                chunk = m_first;
-            }
+            } while (chunk != null);
 
             return false;
         }
-
-        bool EnqueueChunk(Chunk last, Chunk chunk) {
-            if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
-                return false;
-
-            if (last != null)
-                last.next = chunk;
-            else {
-                m_first = chunk;
-            }
-            return true;
-        }
-
-        void RecycleFirstChunk(Chunk first) {
-            var next = first.next;
-
-            if (first != Interlocked.CompareExchange(ref m_first, next, first))
-                return;
-
-            if (next == null) {
-
-                if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
-
-                    // race
-                    // someone already updated the tail, restore the pointer to the queue head
-                    m_first = first;
-                }
-                // the tail is updated
-            }
-        }
+        
 
         public void Clear() {
             // start the new queue
             var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
-
             do {
-                Thread.MemoryBarrier();
                 var first = m_first;
-                var last = m_last;
-
-                if (last == null) // nothing to clear
-                    return;
-
-                if (first == null || (first.next == null && first != last)) // inconcistency
+                if (first.next == null && first != m_last) {
                     continue;
-
-                // here we will create inconsistency which will force others to spin
-                // and prevent from fetching. chunk.next = null
-                if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 
-                    continue;// inconsistent
-
-                m_last = chunk;
-
-                return;
-
-            } while(true);
-        }
-
-        public T[] Drain() {
-            // start the new queue
-            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
-
-            do {
-                Thread.MemoryBarrier();
-                var first = m_first;
-                var last = m_last;
-
-                if (last == null)
-                    return new T[0];
-
-                if (first == null || (first.next == null && first != last))
-                    continue;
+                }
 
                 // here we will create inconsistency which will force others to spin
                 // and prevent from fetching. chunk.next = null
                 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
                     continue;// inconsistent
 
-                last = Interlocked.Exchange(ref m_last, chunk);
+                m_last = chunk;
+                return;
+            } while (true);
+        }
+
+        public List<T> Drain() {
+            // start the new queue
+            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
+
+            do {
+                var first = m_first;
+                // first.next is volatile
+                if (first.next == null) {
+                    if (first != m_last)
+                        continue;
+                    else if (first.Hi == first.Low)
+                        return new List<T>();
+                }
+
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
+                    continue;// inconsistent
+
+                var last = Interlocked.Exchange(ref m_last, chunk);
 
                 return ReadChunks(first, last);
 
-            } while(true);
+            } while (true);
         }
-            
-        static T[] ReadChunks(Chunk chunk, object last) {
+
+        static List<T> ReadChunks(Chunk chunk, object last) {
             var result = new List<T>();
-            var buffer = new T[DEFAULT_CHUNK_SIZE];
+            var buffer = new T[MAX_CHUNK_SIZE];
             int actual;
             bool recycle;
+            SpinWait spin = new SpinWait();
             while (chunk != null) {
                 // ensure all write operations on the chunk are complete
-                chunk.Commit();
+                chunk.Seal();
 
                 // we need to read the chunk using this way
                 // since some client still may completing the dequeue
@@ -467,12 +406,12 @@
                     chunk = null;
                 } else {
                     while (chunk.next == null)
-                        Thread.MemoryBarrier();
+                        spin.SpinOnce();
                     chunk = chunk.next;
                 }
             }
 
-            return result.ToArray();
+            return result;
         }
 
         struct ArraySegmentCollection : ICollection<T> {
@@ -501,7 +440,7 @@
             }
 
             public void CopyTo(T[] array, int arrayIndex) {
-                Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
+                Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
             }
 
             public bool Remove(T item) {