diff Implab/Parallels/AsyncQueue.cs @ 234:8dd666e6b6bf v2

Added implab nuget spec
author cin
date Thu, 05 Oct 2017 09:21:23 +0300
parents d6fe09f5592c
children
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs	Wed Oct 04 15:44:47 2017 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Thu Oct 05 09:21:23 2017 +0300
@@ -3,6 +3,7 @@
 using System;
 using System.Collections;
 using System.Diagnostics;
+using System.Runtime.CompilerServices;
 
 namespace Implab.Parallels {
     public class AsyncQueue<T> : IEnumerable<T> {
@@ -51,6 +52,16 @@
                 get { return m_size; }
             }
 
+            [MethodImpl(MethodImplOptions.AggressiveInlining)]
+            void AwaitWrites(int mark) {
+                if (m_hi != mark) {
+                    SpinWait spin = new SpinWait();
+                    do {
+                        spin.SpinOnce();
+                    } while (m_hi != mark);
+                }
+            }
+
             public bool TryEnqueue(T value) {
                 int alloc;
                 do {
@@ -61,12 +72,7 @@
                 
                 m_data[alloc] = value;
 
-                SpinWait spin = new SpinWait();
-                // m_hi is volatile
-                while (alloc != m_hi) {
-                    // spin wait for commit
-                    spin.SpinOnce();
-                }
+                AwaitWrites(alloc);
                 m_hi = alloc + 1;
 
                 return true;
@@ -77,10 +83,7 @@
             /// </summary>
             public void Seal() {
                 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
-                SpinWait spin = new SpinWait();
-                while (m_hi != actual) {
-                    spin.SpinOnce();
-                }
+                AwaitWrites(actual);
             }
 
             public bool TryDequeue(out T value, out bool recycle) {
@@ -114,11 +117,7 @@
                 
                 Array.Copy(batch, offset, m_data, alloc, enqueued);
 
-                SpinWait spin = new SpinWait();
-                while (alloc != m_hi) {
-                    spin.SpinOnce();
-                }
-
+                AwaitWrites(alloc);
                 m_hi = alloc + enqueued;
                 return true;
             }
@@ -361,9 +360,7 @@
         }
 
         public List<T> Drain() {
-            // start the new queue
-            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
-
+            Chunk chunk = null;
             do {
                 var first = m_first;
                 // first.next is volatile
@@ -374,6 +371,10 @@
                         return new List<T>();
                 }
 
+                // start the new queue
+                if (chunk == null)
+                    chunk = new Chunk(DEFAULT_CHUNK_SIZE);
+
                 // here we will create inconsistency which will force others to spin
                 // and prevent from fetching. chunk.next = null
                 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))