diff Implab/Parallels/AsyncQueue.cs @ 124:a336cb13c6a9 v2

major update, added Drain mathod to AsyncQueue class
author cin
date Thu, 15 Jan 2015 02:43:14 +0300
parents f4d6ea6969cc
children f803565868a4
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -2,6 +2,7 @@
 using System.Collections.Generic;
 using System;
 using System.Collections;
+using System.Diagnostics;
 
 namespace Implab.Parallels {
     public class AsyncQueue<T> : IEnumerable<T> {
@@ -60,6 +61,16 @@
                 return true;
             }
 
+            /// <summary>
+            /// Prevents from allocating new space in the chunk and waits for all write operations to complete
+            /// </summary>
+            public void Commit() {
+                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
+
+                while (m_hi != actual)
+                    Thread.MemoryBarrier();
+            }
+
             public bool TryDequeue(out T value, out bool recycle) {
                 int low;
                 do {
@@ -359,76 +370,114 @@
 
             if (last != null)
                 last.next = chunk;
-            else
+            else {
                 m_first = chunk;
+            }
             return true;
         }
 
         void RecycleFirstChunk(Chunk first) {
             var next = first.next;
 
+            if (first != Interlocked.CompareExchange(ref m_first, next, first))
+                return;
+
             if (next == null) {
-                // looks like this is the last chunk
+
                 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
+                    /*while (first.next == null)
+                        Thread.MemoryBarrier();*/
+
                     // race
-                    // maybe someone already recycled this chunk
-                    // or a new chunk has been appedned to the queue
-
-                    return; // give up
+                    // someone already updated the tail, restore the pointer to the queue head
+                    m_first = first;
                 }
                 // the tail is updated
             }
 
             // we need to update the head
-            Interlocked.CompareExchange(ref m_first, next, first);
+            //Interlocked.CompareExchange(ref m_first, next, first);
             // if the head is already updated then give up
-            return;
+            //return;
 
         }
 
         public void Clear() {
             // start the new queue
-            var t = new Chunk(m_chunkSize);
-            Thread.MemoryBarrier();
-            m_last = t;
-            Thread.MemoryBarrier();
+            var chunk = new Chunk(m_chunkSize);
+
+            do {
+                Thread.MemoryBarrier();
+                var first = m_first;
+                var last = m_last;
+
+                if (last == null) // nothing to clear
+                    return;
 
-            // make the new queue available to the readers, and stop the old one
-            m_first = t;
-            Thread.MemoryBarrier();
+                if (first == null || (first.next == null && first != last)) // inconcistency
+                    continue;
+
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 
+                    continue;// inconsistent
+
+                m_last = chunk;
+
+                return;
+
+            } while(true);
         }
 
         public T[] Drain() {
             // start the new queue
-            var t = new Chunk(m_chunkSize);
-            Thread.MemoryBarrier();
-            m_last = t;
-            Thread.MemoryBarrier();
-
-            // make the new queue available to the readers, and stop the old one
-            Chunk first;
+            var chunk = new Chunk(m_chunkSize);
 
             do {
-                first = m_first;
-            } while(first != Interlocked.CompareExchange(ref m_first
-            Thread.MemoryBarrier();
+                Thread.MemoryBarrier();
+                var first = m_first;
+                var last = m_last;
+
+                if (last == null)
+                    return new T[0];
+
+                if (first == null || (first.next == null && first != last))
+                    continue;
 
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
+                    continue;// inconsistent
 
+                last = Interlocked.Exchange(ref m_last, chunk);
+
+                return ReadChunks(first, last);
+
+            } while(true);
         }
             
-        T[] ReadChunks(Chunk chunk) {
+        T[] ReadChunks(Chunk chunk, object last) {
             var result = new List<T>();
             var buffer = new T[m_chunkSize];
             int actual;
             bool recycle;
             while (chunk != null) {
+                // ensure all write operations on the chunk are complete
+                chunk.Commit();
+
                 // we need to read the chunk using this way
                 // since some client still may completing the dequeue
                 // operation, such clients most likely won't get results
                 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
                     result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
 
-                chunk = chunk.next;
+                if (chunk == last) {
+                    chunk = null;
+                } else {
+                    while (chunk.next == null)
+                        Thread.MemoryBarrier();
+                    chunk = chunk.next;
+                }
             }
 
             return result.ToArray();