changeset 233:d6fe09f5592c v2

Improved AsyncQueue Removed ImplabFx
author cin
date Wed, 04 Oct 2017 15:44:47 +0300
parents 5f7a3e1d32b9
children 8dd666e6b6bf
files Implab.Playground/App.config Implab.Playground/Implab.Playground.csproj Implab.Playground/Program.cs Implab.Playground2.psess Implab.Test/AsyncTests.cs Implab.sln Implab/AbstractEvent.cs Implab/Automaton/AutomatonTransition.cs Implab/Implab.csproj Implab/Parallels/AsyncQueue.cs Implab/Parallels/BlockingQueue.cs Implab/Parallels/MTQueue.cs Implab/Parallels/SimpleAsyncQueue.cs
diffstat 13 files changed, 501 insertions(+), 431 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Playground/App.config	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab.Playground/App.config	Wed Oct 04 15:44:47 2017 +0300
@@ -1,6 +1,6 @@
-<?xml version="1.0" encoding="utf-8" ?>
+<?xml version="1.0" encoding="utf-8"?>
 <configuration>
     <startup> 
-        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" />
+        <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5"/>
     </startup>
-</configuration>
\ No newline at end of file
+</configuration>
--- a/Implab.Playground/Implab.Playground.csproj	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab.Playground/Implab.Playground.csproj	Wed Oct 04 15:44:47 2017 +0300
@@ -9,9 +9,10 @@
     <AppDesignerFolder>Properties</AppDesignerFolder>
     <RootNamespace>Implab.Playground</RootNamespace>
     <AssemblyName>Implab.Playground</AssemblyName>
-    <TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion>
+    <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
     <FileAlignment>512</FileAlignment>
     <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
+    <TargetFrameworkProfile />
   </PropertyGroup>
   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
     <PlatformTarget>AnyCPU</PlatformTarget>
--- a/Implab.Playground/Program.cs	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab.Playground/Program.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -1,10 +1,13 @@
 using Implab.Formats.Json;
+using Implab.Parallels;
 using Implab.Xml;
 using System;
+using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.IO;
 using System.Linq;
 using System.Text;
+using System.Threading;
 using System.Threading.Tasks;
 using System.Xml;
 using System.Xml.Serialization;
@@ -12,29 +15,167 @@
 namespace Implab.Playground {
     public class Program {
 
-        [XmlRoot(Namespace = "XmlSimpleData")]
-        public class XmlSimpleModel {
-            [XmlElement]
-            public string Name { get; set; }
+        static void EnqueueRange<T>(ConcurrentQueue<T> q, T[] data, int offset, int len) {
+            for (var i = offset; i < offset + len; i++)
+                q.Enqueue(data[i]);
+        }
+
+        static bool TryDequeueRange<T>(ConcurrentQueue<T> q,T[] buffer,int offset, int len, out int actual) {
+            actual = 0;
+            T res;
+            while(q.TryDequeue(out res)) {
+                buffer[offset + actual] = res;
+                actual++;
+                if (actual == len)
+                    break;
+            }
+            return actual != 0;
+        }
+
+        static void EnqueueRange<T>(SimpleAsyncQueue<T> q, T[] data, int offset, int len) {
+            for (var i = offset; i < offset + len; i++)
+                q.Enqueue(data[i]);
+        }
 
-            [XmlElement]
-            public int Order { get; set; }
+        static bool TryDequeueRange<T>(SimpleAsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
+            actual = 0;
+            T res;
+            while (q.TryDequeue(out res)) {
+                buffer[offset + actual] = res;
+                actual++;
+                if (actual == len)
+                    break;
+            }
+            return actual != 0;
+        }
 
-            [XmlElement]
-            public string[] Items { get; set; }
+        /*
+        static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) {
+            for (var i = offset; i < offset + len; i++)
+                q.Enqueue(data[i]);
+        }
 
+        static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
+            actual = 0;
+            T res;
+            while (q.TryDequeue(out res)) {
+                buffer[offset + actual] = res;
+                actual++;
+                if (actual == len)
+                    break;
+            }
+            return actual != 0;
+        }
+        */
+        
+        static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) {
+            q.EnqueueRange(data, offset, len);
         }
 
+        static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) {
+            return q.TryDequeueRange(buffer, offset, len, out actual);
+        }
+        
+
         static void Main(string[] args) {
-            var model = new XmlSimpleModel {
-                Name = "Tablet",
-                Order = 10,
-                Items = new string[] { "z1", "z2", "z3" }
-            };
 
-            var doc = SerializationHelpers.SerializeAsXmlDocument(model);
-
-            var m2 = SerializationHelpers.DeserializeFromXmlNode<XmlSimpleModel>(doc.DocumentElement);
+            //var queue = new ConcurrentQueue<int>();
+            var queue = new AsyncQueue<int>();
+            //var queue = new SimpleAsyncQueue<int>();
+
+            const int wBatch = 32;
+            const long wCount = 1000000;
+            const long total = wBatch * wCount * 3;
+
+            long r1 = 0, r2 = 0, r3 = 0;
+            const int rBatch = 1000;
+            long read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for (int i = 0; i < wBatch; i++)
+                        buffer[i] = 1;
+
+                    for (int i = 0; i < wCount; i++)
+                        EnqueueRange(queue, buffer, 0, wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for (int i = 0; i < wBatch; i++)
+                        buffer[i] = 1;
+
+                    for (int i = 0; i < wCount; i++)
+                        EnqueueRange(queue, buffer, 0, wBatch);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for (int i = 0; i < wBatch; i++)
+                        buffer[i] = 1;
+
+                    for (int i = 0; i < wCount; i++)
+                        EnqueueRange(queue, buffer, 0, wBatch);
+                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[rBatch];
+
+                    while (read < total) {
+                        int actual;
+                        if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
+                            for (int i = 0; i < actual; i++)
+                                r1 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                        }
+                    }
+
+                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
+                }/*,
+                () => {
+                    var buffer = new int[rBatch];
+
+                    while (read < total) {
+                        int actual;
+                        if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
+                            for (int i = 0; i < actual; i++)
+                                r2 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                        }
+                    }
+
+                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
+                }*//*,
+                () => {
+                    var buffer = new int[rBatch];
+
+                    while (read < total) {
+                        int actual;
+                        if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) {
+                            for (int i = 0; i < actual; i++)
+                                r3 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                        }
+                    }
+
+                    Console.WriteLine("done reader #3: {0} ms", Environment.TickCount - t1);
+                }*/
+            )
+                .PromiseAll()
+                .Join();
+
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2 + r3,
+                total
+            );
 
             Console.WriteLine("done");
         }
--- a/Implab.Playground2.psess	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab.Playground2.psess	Wed Oct 04 15:44:47 2017 +0300
@@ -48,8 +48,8 @@
       <LaunchProject>true</LaunchProject>
       <OverrideProjectSettings>false</OverrideProjectSettings>
       <LaunchMethod>Executable</LaunchMethod>
-      <ExecutablePath>Implab.Playground\bin\Release\Implab.Playground.exe</ExecutablePath>
-      <StartupDirectory>Implab.Playground\bin\Release\</StartupDirectory>
+      <ExecutablePath>Implab.Playground\bin\Debug\Implab.Playground.exe</ExecutablePath>
+      <StartupDirectory>Implab.Playground\bin\Debug\</StartupDirectory>
       <Arguments>
       </Arguments>
       <NetAppHost>IIS</NetAppHost>
@@ -67,9 +67,4 @@
       <ProjName>Implab.Playground</ProjName>
     </ProjBinary>
   </Binaries>
-  <Launches>
-    <ProjBinary>
-      <Path>:PB:{100DFEB0-75BE-436F-ADDF-1F46EF433F46}|Implab.Playground\Implab.Playground.csproj</Path>
-    </ProjBinary>
-  </Launches>
 </VSPerformanceSession>
\ No newline at end of file
--- a/Implab.Test/AsyncTests.cs	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab.Test/AsyncTests.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -222,9 +222,9 @@
 
         [TestMethod]
         public void MTQueueTest() {
-            var queue = new MTQueue<int>();
+            var queue = new SimpleAsyncQueue<int>();
             int res;
-
+            
             queue.Enqueue(10);
             Assert.IsTrue(queue.TryDequeue(out res));
             Assert.AreEqual(10, res);
@@ -242,8 +242,9 @@
             int readers = 0;
             var stop = new ManualResetEvent(false);
             int total = 0;
+            var ticks = Environment.TickCount;
 
-            const int itemsPerWriter = 10000;
+            const int itemsPerWriter = 1000000;
             const int writersCount = 10;
 
             for (int i = 0; i < writersCount; i++) {
@@ -278,7 +279,9 @@
 
             stop.WaitOne();
 
-            Assert.AreEqual(100000, total);
+            Console.WriteLine("{0} in {1}ms", total, Environment.TickCount - ticks);
+
+            Assert.AreEqual(itemsPerWriter * writersCount, total);
         }
 
         [TestMethod]
@@ -509,13 +512,12 @@
         public void AsyncQueueDrainTest() {
             var queue = new AsyncQueue<int>();
 
-            const int wBatch = 11;
+            const int wBatch = 32;
             const int wCount = 200000;
             const int total = wBatch * wCount * 3;
             const int summ = wBatch * wCount * 3;
 
             int r1 = 0, r2 = 0;
-            const int rBatch = 11;
             int read = 0;
 
             var t1 = Environment.TickCount;
@@ -531,8 +533,12 @@
                     Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                 },
                 () => {
-                    for(int i =0; i < wCount * wBatch; i++)
-                        queue.Enqueue(1);
+                    var buffer = new int[wBatch];
+                    for (int i = 0; i < wBatch; i++)
+                        buffer[i] = 1;
+
+                    for (int i = 0; i < wCount; i++)
+                        queue.EnqueueRange(buffer, 0, wBatch);
                     Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                 },
                 () => {
@@ -572,25 +578,34 @@
                 },*/
                 () => {
                     var count = 0;
-                    while(read < total) {
+                    int emptyDrains = 0;
+
+                    while (read < total) {    
                         var buffer = queue.Drain();
-                        for(int i=0; i< buffer.Length; i++)
+                        if (buffer.Count == 0)
+                            emptyDrains++;
+                        for(int i=0; i< buffer.Count; i++)
                             r1 += buffer[i];
-                            Interlocked.Add(ref read, buffer.Length);
-                        count += buffer.Length;
+                            Interlocked.Add(ref read, buffer.Count);
+                        count += buffer.Count;
                     }
-                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+                    Console.WriteLine("done reader #1: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
                 },
                 () => {
-                    var count = 0;
-                    while(read < total) {
+                    var count = 0;
+                    int emptyDrains = 0;
+
+                    while (read < total) {
                         var buffer = queue.Drain();
-                        for(int i=0; i< buffer.Length; i++)
+                        if (buffer.Count == 0)
+                            emptyDrains++;
+
+                        for (int i=0; i< buffer.Count; i++)
                             r2 += buffer[i];
-                        Interlocked.Add(ref read, buffer.Length);
-                        count += buffer.Length;
+                        Interlocked.Add(ref read, buffer.Count);
+                        count += buffer.Count;
                     }
-                    Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
+                    Console.WriteLine("done reader #2: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
                 }
             )
                 .PromiseAll()
--- a/Implab.sln	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab.sln	Wed Oct 04 15:44:47 2017 +0300
@@ -14,8 +14,6 @@
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test", "Implab.Test\Implab.Test.csproj", "{63F92C0C-61BF-48C0-A377-8D67C3C661D0}"
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx", "Implab.Fx\Implab.Fx.csproj", "{06E706F8-6881-43EB-927E-FFC503AF6ABC}"
-EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Format.Test", "Implab.Format.Test\Implab.Format.Test.csproj", "{4D364996-7ECD-4193-8F90-F223FFEA49DA}"
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Playground", "Implab.Playground\Implab.Playground.csproj", "{100DFEB0-75BE-436F-ADDF-1F46EF433F46}"
@@ -47,14 +45,6 @@
 		{63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
 		{63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.ActiveCfg = Release|Any CPU
 		{63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.Build.0 = Release|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.ActiveCfg = Debug 4.5|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.Build.0 = Debug 4.5|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.Build.0 = Debug|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.ActiveCfg = Release 4.5|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.ActiveCfg = Release|Any CPU
-		{06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.Build.0 = Release|Any CPU
 		{4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.ActiveCfg = Debug|Any CPU
 		{4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.Build.0 = Debug|Any CPU
 		{4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
--- a/Implab/AbstractEvent.cs	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab/AbstractEvent.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -24,13 +24,13 @@
 
         //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
         THandler[] m_handlers;
-        MTQueue<THandler> m_extraHandlers;
+        SimpleAsyncQueue<THandler> m_extraHandlers;
         int m_handlerPointer = -1;
         int m_handlersCommited;
 
         int m_cancelRequest;
         Exception m_cancelationReason;
-        MTQueue<Action<Exception>> m_cancelationHandlers;
+        SimpleAsyncQueue<Action<Exception>> m_cancelationHandlers;
 
 
         #region state managment
@@ -182,7 +182,7 @@
                     }
                 } else {
                     if (slot == RESERVED_HANDLERS_COUNT) {
-                        m_extraHandlers = new MTQueue<THandler>();
+                        m_extraHandlers = new SimpleAsyncQueue<THandler>();
                     } else {
                         while (m_extraHandlers == null)
                             Thread.MemoryBarrier();
@@ -245,7 +245,7 @@
                 handler(CancellationReason);
 
             if (m_cancelationHandlers == null)
-                Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
+                Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);
 
             m_cancelationHandlers.Enqueue(handler);
 
--- a/Implab/Automaton/AutomatonTransition.cs	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab/Automaton/AutomatonTransition.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -1,7 +1,7 @@
 using System;
 
 namespace Implab.Automaton {
-    public struct AutomatonTransition : IEquatable<AutomatonTransition> {
+    public class AutomatonTransition : IEquatable<AutomatonTransition> {
         public readonly int s1;
         public readonly int s2;
         public readonly int edge;
--- a/Implab/Implab.csproj	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab/Implab.csproj	Wed Oct 04 15:44:47 2017 +0300
@@ -1,5 +1,5 @@
 <?xml version="1.0" encoding="utf-8"?>
-<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
     <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
@@ -8,6 +8,7 @@
     <RootNamespace>Implab</RootNamespace>
     <AssemblyName>Implab</AssemblyName>
     <TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
+    <TargetFrameworkProfile />
   </PropertyGroup>
   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
     <DebugSymbols>true</DebugSymbols>
@@ -97,7 +98,7 @@
     <Compile Include="ITaskController.cs" />
     <Compile Include="Parallels\DispatchPool.cs" />
     <Compile Include="Parallels\ArrayTraits.cs" />
-    <Compile Include="Parallels\MTQueue.cs" />
+    <Compile Include="Parallels\SimpleAsyncQueue.cs" />
     <Compile Include="Parallels\WorkerPool.cs" />
     <Compile Include="ProgressInitEventArgs.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
--- a/Implab/Parallels/AsyncQueue.cs	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -7,11 +7,11 @@
 namespace Implab.Parallels {
     public class AsyncQueue<T> : IEnumerable<T> {
         class Chunk {
-            public Chunk next;
+            public volatile Chunk next;
 
-            int m_low;
-            int m_hi;
-            int m_alloc;
+            volatile int m_low;
+            volatile int m_hi;
+            volatile int m_alloc;
             readonly int m_size;
             readonly T[] m_data;
 
@@ -28,12 +28,15 @@
                 m_data[0] = value;
             }
 
-            public Chunk(int size, T[] data, int offset, int length, int alloc) {
+            public Chunk(int size, int allocated) {
                 m_size = size;
-                m_hi = length;
-                m_alloc = alloc;
+                m_hi = allocated;
+                m_alloc = allocated;
                 m_data = new T[size];
-                Array.Copy(data, offset, m_data, 0, length);
+            }
+
+            public void WriteData(T[] data, int offset, int dest, int length) {
+                Array.Copy(data, offset, m_data, dest, length);
             }
 
             public int Low {
@@ -48,31 +51,36 @@
                 get { return m_size; }
             }
 
-            public bool TryEnqueue(T value, out bool extend) {
-                var alloc = Interlocked.Increment(ref m_alloc) - 1;
-
-                if (alloc >= m_size) {
-                    extend = alloc == m_size;
-                    return false;
-                }
-
-                extend = false;
+            public bool TryEnqueue(T value) {
+                int alloc;
+                do {
+                    alloc = m_alloc;
+                    if (alloc >= m_size)
+                        return false;
+                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
+                
                 m_data[alloc] = value;
 
-                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
+                SpinWait spin = new SpinWait();
+                // m_hi is volatile
+                while (alloc != m_hi) {
                     // spin wait for commit
+                    spin.SpinOnce();
                 }
+                m_hi = alloc + 1;
+
                 return true;
             }
 
             /// <summary>
             /// Prevents from allocating new space in the chunk and waits for all write operations to complete
             /// </summary>
-            public void Commit() {
-                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
-
-                while (m_hi != actual)
-                    Thread.MemoryBarrier();
+            public void Seal() {
+                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
+                SpinWait spin = new SpinWait();
+                while (m_hi != actual) {
+                    spin.SpinOnce();
+                }
             }
 
             public bool TryDequeue(out T value, out bool recycle) {
@@ -84,44 +92,38 @@
                         recycle = (low == m_size);
                         return false;
                     }
-                } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
+                } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
 
-                recycle = (low == m_size - 1);
+                recycle = (low + 1 == m_size);
                 value = m_data[low];
 
                 return true;
             }
 
-            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
-                //int alloc;
-                //int allocSize;
+            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
+                int alloc;
+                do {
+                    alloc = m_alloc;
+                    if (alloc >= m_size) {
+                        enqueued = 0;
+                        return false;
+                    } else {
+                        enqueued = Math.Min(length, m_size - alloc);
+                    }
+                } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
+                
+                Array.Copy(batch, offset, m_data, alloc, enqueued);
 
-                var alloc = Interlocked.Add(ref m_alloc, length) - length;
-                if (alloc > m_size) {
-                    // the chunk is full and someone already
-                    // creating the new one
-                    enqueued = 0; // nothing was added
-                    extend = false; // the caller shouldn't try to extend the queue
-                    return false; // nothing was added
+                SpinWait spin = new SpinWait();
+                while (alloc != m_hi) {
+                    spin.SpinOnce();
                 }
 
-                enqueued = Math.Min(m_size - alloc, length);
-                extend = length > enqueued;
-
-                if (enqueued == 0)
-                    return false;
-
-
-                Array.Copy(batch, offset, m_data, alloc, enqueued);
-
-                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
-                    // spin wait for commit
-                }
-
+                m_hi = alloc + enqueued;
                 return true;
             }
 
-            public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
+            public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
                 int low, hi, batchSize;
 
                 do {
@@ -129,15 +131,14 @@
                     hi = m_hi;
                     if (low >= hi) {
                         dequeued = 0;
-                        recycle = (low == m_size); // recycling could be restarted and we need to signal again
+                        recycle = (low == m_size);
                         return false;
                     }
                     batchSize = Math.Min(hi - low, length);
-                } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
+                } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
 
-                recycle = (low == m_size - batchSize);
                 dequeued = batchSize;
-
+                recycle = (low + batchSize == m_size);
                 Array.Copy(m_data, low, buffer, offset, batchSize);
 
                 return true;
@@ -149,32 +150,33 @@
         }
 
         public const int DEFAULT_CHUNK_SIZE = 32;
-        public const int MAX_CHUNK_SIZE = 262144;
+        public const int MAX_CHUNK_SIZE = 256;
 
         Chunk m_first;
         Chunk m_last;
 
+        public AsyncQueue() {
+            m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
+        }
+
         /// <summary>
         /// Adds the specified value to the queue.
         /// </summary>
         /// <param name="value">Tha value which will be added to the queue.</param>
-        public virtual void Enqueue(T value) {
+        public void Enqueue(T value) {
             var last = m_last;
-            // spin wait to the new chunk
-            bool extend = true;
-            while (last == null || !last.TryEnqueue(value, out extend)) {
+            SpinWait spin = new SpinWait();
+            while (!last.TryEnqueue(value)) {
                 // try to extend queue
-                if (extend || last == null) {
-                    var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
-                    if (EnqueueChunk(last, chunk))
-                        break; // success! exit!
-                    last = m_last;
+                var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
+                var t = Interlocked.CompareExchange(ref m_last, chunk, last);
+                if (t == last) {
+                    last.next = chunk;
+                    break;
                 } else {
-                    while (last == m_last) {
-                        Thread.MemoryBarrier();
-                    }
-                    last = m_last;
+                    last = t;
                 }
+                spin.SpinOnce();
             }
         }
 
@@ -184,67 +186,54 @@
         /// <param name="data">The buffer which contains the data to be enqueued.</param>
         /// <param name="offset">The offset of the data in the buffer.</param>
         /// <param name="length">The size of the data to read from the buffer.</param>
-        public virtual void EnqueueRange(T[] data, int offset, int length) {
+        public void EnqueueRange(T[] data, int offset, int length) {
             if (data == null)
                 throw new ArgumentNullException("data");
-            if (length == 0)
-                return;
             if (offset < 0)
                 throw new ArgumentOutOfRangeException("offset");
             if (length < 1 || offset + length > data.Length)
                 throw new ArgumentOutOfRangeException("length");
 
-            var last = m_last;
+            while (length > 0) {
+                var last = m_last;
+                int enqueued;
 
-            bool extend;
-            int enqueued;
-
-            while (length > 0) {
-                extend = true;
-                if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
+                if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
                     length -= enqueued;
                     offset += enqueued;
                 }
 
-                if (extend) {
-                    // there was no enough space in the chunk
-                    // or there was no chunks in the queue
+                if (length > 0) {
+                    // we have something to enqueue
 
-                    while (length > 0) {
-
-                        var size = Math.Min(length, MAX_CHUNK_SIZE);
+                    var tail = length % MAX_CHUNK_SIZE;
 
-                        var chunk = new Chunk(
-                            Math.Max(size, DEFAULT_CHUNK_SIZE),
-                            data,
-                            offset,
-                            size,
-                            length // length >= size
-                        );
+                    var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
+
+                    if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
+                        continue; // we wasn't able to catch the writer, roundtrip
 
-                        if (!EnqueueChunk(last, chunk)) {
-                            // looks like the queue has been updated then proceed from the beginning
-                            last = m_last; 
-                            break;
-                        }
+                    // we are lucky
+                    // we can exclusively write our batch, the other writers will continue their work
+
+                    length -= tail;
 
-                        // we have successfully added the new chunk
-                        last = chunk;
-                        length -= size;
-                        offset += size;
-                    }
-                } else {
-                    // we don't need to extend the queue, if we successfully enqueued data
-                    if (length == 0)
-                        break;
-
-                    // if we need to wait while someone is extending the queue
-                    // spinwait
-                    while (last == m_last) {
-                        Thread.MemoryBarrier();
+                    
+                    for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
+                        var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
+                        node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
+                        offset += MAX_CHUNK_SIZE;
+                        // fence last.next is volatile
+                        last.next = node;
+                        last = node;
                     }
 
-                    last = m_last;
+                    if (tail > 0)
+                        chunk.WriteData(data, offset, 0, tail);
+                    
+                    // fence last.next is volatile
+                    last.next = chunk;
+                    return;
                 }
             }
         }
@@ -256,26 +245,21 @@
         /// <param name="value">The value of the dequeued element.</param>
         public bool TryDequeue(out T value) {
             var chunk = m_first;
-            bool recycle;
-            while (chunk != null) {
+            do {
+                bool recycle;
 
                 var result = chunk.TryDequeue(out value, out recycle);
 
-                if (recycle) // this chunk is waste
-                    RecycleFirstChunk(chunk);
-                else
+                if (recycle && chunk.next != null) {
+                    // this chunk is waste
+                    chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+                } else {
                     return result; // this chunk is usable and returned actual result
+                }
 
                 if (result) // this chunk is waste but the true result is always actual
                     return true;
-
-                // try again
-                chunk = m_first;
-            }
-
-            // the queue is empty
-            value = default(T);
-            return false;
+            } while (true);
         }
 
         /// <summary>
@@ -295,10 +279,9 @@
                 throw new ArgumentOutOfRangeException("length");
 
             var chunk = m_first;
-            bool recycle;
             dequeued = 0;
-            while (chunk != null) {
-
+            do {
+                bool recycle;
                 int actual;
                 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
                     offset += actual;
@@ -306,18 +289,16 @@
                     dequeued += actual;
                 }
 
-                if (recycle) // this chunk is waste
-                    RecycleFirstChunk(chunk);
-                else if (actual == 0)
-                    break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
+                if (recycle && chunk.next != null) {
+                    // this chunk is waste
+                    chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+                } else {
+                    chunk = null;
+                }
 
                 if (length == 0)
                     return true;
-
-                // we still may dequeue something
-                // try again
-                chunk = m_first;
-            }
+            } while (chunk != null);
 
             return dequeued != 0;
         }
@@ -339,123 +320,81 @@
                 throw new ArgumentOutOfRangeException("length");
 
             var chunk = m_first;
-            bool recycle;
-            dequeued = 0;
-
-            while (chunk != null) {
+            do {
+                bool recycle;
+                chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
 
-                int actual;
-                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
-                    dequeued = actual;
+                if (recycle && chunk.next != null) {
+                    // this chunk is waste
+                    chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
+                } else {
+                    chunk = null;
                 }
 
-                if (recycle) // this chunk is waste
-                    RecycleFirstChunk(chunk);
-
                 // if we have dequeued any data, then return
                 if (dequeued != 0)
                     return true;
 
-                // we still may dequeue something
-                // try again
-                chunk = m_first;
-            }
+            } while (chunk != null);
 
             return false;
         }
-
-        bool EnqueueChunk(Chunk last, Chunk chunk) {
-            if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
-                return false;
-
-            if (last != null)
-                last.next = chunk;
-            else {
-                m_first = chunk;
-            }
-            return true;
-        }
-
-        void RecycleFirstChunk(Chunk first) {
-            var next = first.next;
-
-            if (first != Interlocked.CompareExchange(ref m_first, next, first))
-                return;
-
-            if (next == null) {
-
-                if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
-
-                    // race
-                    // someone already updated the tail, restore the pointer to the queue head
-                    m_first = first;
-                }
-                // the tail is updated
-            }
-        }
+        
 
         public void Clear() {
             // start the new queue
             var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
-
             do {
-                Thread.MemoryBarrier();
                 var first = m_first;
-                var last = m_last;
-
-                if (last == null) // nothing to clear
-                    return;
-
-                if (first == null || (first.next == null && first != last)) // inconcistency
+                if (first.next == null && first != m_last) {
                     continue;
-
-                // here we will create inconsistency which will force others to spin
-                // and prevent from fetching. chunk.next = null
-                if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 
-                    continue;// inconsistent
-
-                m_last = chunk;
-
-                return;
-
-            } while(true);
-        }
-
-        public T[] Drain() {
-            // start the new queue
-            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
-
-            do {
-                Thread.MemoryBarrier();
-                var first = m_first;
-                var last = m_last;
-
-                if (last == null)
-                    return new T[0];
-
-                if (first == null || (first.next == null && first != last))
-                    continue;
+                }
 
                 // here we will create inconsistency which will force others to spin
                 // and prevent from fetching. chunk.next = null
                 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
                     continue;// inconsistent
 
-                last = Interlocked.Exchange(ref m_last, chunk);
+                m_last = chunk;
+                return;
+            } while (true);
+        }
+
+        public List<T> Drain() {
+            // start the new queue
+            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
+
+            do {
+                var first = m_first;
+                // first.next is volatile
+                if (first.next == null) {
+                    if (first != m_last)
+                        continue;
+                    else if (first.Hi == first.Low)
+                        return new List<T>();
+                }
+
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
+                    continue;// inconsistent
+
+                var last = Interlocked.Exchange(ref m_last, chunk);
 
                 return ReadChunks(first, last);
 
-            } while(true);
+            } while (true);
         }
-            
-        static T[] ReadChunks(Chunk chunk, object last) {
+
+        static List<T> ReadChunks(Chunk chunk, object last) {
             var result = new List<T>();
-            var buffer = new T[DEFAULT_CHUNK_SIZE];
+            var buffer = new T[MAX_CHUNK_SIZE];
             int actual;
             bool recycle;
+            SpinWait spin = new SpinWait();
             while (chunk != null) {
                 // ensure all write operations on the chunk are complete
-                chunk.Commit();
+                chunk.Seal();
 
                 // we need to read the chunk using this way
                 // since some client still may completing the dequeue
@@ -467,12 +406,12 @@
                     chunk = null;
                 } else {
                     while (chunk.next == null)
-                        Thread.MemoryBarrier();
+                        spin.SpinOnce();
                     chunk = chunk.next;
                 }
             }
 
-            return result.ToArray();
+            return result;
         }
 
         struct ArraySegmentCollection : ICollection<T> {
@@ -501,7 +440,7 @@
             }
 
             public void CopyTo(T[] array, int arrayIndex) {
-                Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
+                Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
             }
 
             public bool Remove(T item) {
--- a/Implab/Parallels/BlockingQueue.cs	Tue Sep 12 19:07:42 2017 +0300
+++ b/Implab/Parallels/BlockingQueue.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -5,13 +5,13 @@
     public class BlockingQueue<T> : AsyncQueue<T> {
         readonly object m_lock = new object();
 
-        public override void Enqueue(T value) {
+        public void EnqueuePulse(T value) {
             base.Enqueue(value);
             lock (m_lock)
                 Monitor.Pulse(m_lock);
         }
 
-        public override void EnqueueRange(T[] data, int offset, int length) {
+        public void EnqueueRangePulse(T[] data, int offset, int length) {
             base.EnqueueRange(data, offset, length);
             if (length > 1)
                 lock (m_lock)
--- a/Implab/Parallels/MTQueue.cs	Tue Sep 12 19:07:42 2017 +0300
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,143 +0,0 @@
-using System.Threading;
-using System.Collections.Generic;
-using System;
-using System.Collections;
-
-namespace Implab.Parallels {
-    public class MTQueue<T> : IEnumerable<T> {
-        class Node {
-            public Node(T value) {
-                this.value = value;
-            }
-            public readonly T value;
-            public Node next;
-        }
-
-        Node m_first;
-        Node m_last;
-
-        public void Enqueue(T value) {
-            Thread.MemoryBarrier();
-
-            var last = m_last;
-            var next = new Node(value);
-
-            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
-            // to ensure that the next node is completely constructed
-            while (last != Interlocked.CompareExchange(ref m_last, next, last))
-                last = m_last;
-
-            if (last != null)
-                last.next = next;
-            else
-                m_first = next;
-        }
-
-        public bool TryDequeue(out T value) {
-            Node first;
-            Node next;
-            value = default(T);
-
-            Thread.MemoryBarrier();
-            do {
-                first = m_first;
-                if (first == null)
-                    return false;
-                next = first.next;
-                if (next == null) {
-                    // this is the last element,
-                    // then try to update the tail
-                    if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
-                        // this is the race condition
-                        if (m_last == null)
-                            // the queue is empty
-                            return false;
-                        // tail has been changed, we need to restart
-                        continue; 
-                    }
-
-                    // tail succesfully updated and first.next will never be changed
-                    // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null
-                    // however the parallel writer may update the m_first since the m_last is null
-
-                    // so we need to fix inconsistency by setting m_first to null or if it has been
-                    // updated by the writer already then we should just to give up
-                    Interlocked.CompareExchange(ref m_first, null, first);
-                    break;
-
-                }
-                if (first == Interlocked.CompareExchange(ref m_first, next, first))
-                    // head succesfully updated
-                    break;
-            } while (true);
-
-            value = first.value;
-            return true;
-        }
-
-        #region IEnumerable implementation
-
-        class Enumerator : IEnumerator<T> {
-            Node m_current;
-            Node m_first;
-
-            public Enumerator(Node first) {
-                m_first = first;
-            }
-
-            #region IEnumerator implementation
-
-            public bool MoveNext() {
-                m_current = m_current == null ? m_first : m_current.next;
-                return m_current != null;
-            }
-
-            public void Reset() {
-                m_current = null;
-            }
-
-            object IEnumerator.Current {
-                get {
-                    if (m_current == null)
-                        throw new InvalidOperationException();
-                    return m_current.value;
-                }
-            }
-
-            #endregion
-
-            #region IDisposable implementation
-
-            public void Dispose() {
-            }
-
-            #endregion
-
-            #region IEnumerator implementation
-
-            public T Current {
-                get {
-                    if (m_current == null)
-                        throw new InvalidOperationException();
-                    return m_current.value;
-                }
-            }
-
-            #endregion
-        }
-
-        public IEnumerator<T> GetEnumerator() {
-            return new Enumerator(m_first);
-        }
-
-        #endregion
-
-        #region IEnumerable implementation
-
-        IEnumerator IEnumerable.GetEnumerator() {
-            return GetEnumerator();
-        }
-
-        #endregion
-    }
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/SimpleAsyncQueue.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -0,0 +1,131 @@
+using System.Threading;
+using System.Collections.Generic;
+using System;
+using System.Collections;
+
+namespace Implab.Parallels {
+    public class SimpleAsyncQueue<T> : IEnumerable<T> {
+        class Node {
+            public Node(T value) {
+                this.value = value;
+            }
+            public readonly T value;
+            public volatile Node next;
+        }
+
+        // the reader and the writer are mainteined completely independent,
+        // the reader can read next item when m_first.next is not null
+        // the writer creates the a new node, moves m_last to this node and
+        // only after that restores the reference from the previous node
+        // making available the reader to read the new node.
+
+        Node m_first; // position on the node which is already read
+        Node m_last; // position on the node which is already written
+
+        public SimpleAsyncQueue() {
+            m_first = m_last = new Node(default(T));
+        }
+
+        public void Enqueue(T value) {
+            var next = new Node(value);
+
+            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
+            // to ensure that the next node is completely constructed
+            var last = Interlocked.Exchange(ref m_last, next);
+
+            // release-fence
+            last.next = next;
+            
+        }
+
+        public bool TryDequeue(out T value) {
+            Node first;
+            Node next;
+
+            Thread.MemoryBarrier(); // ensure m_first is fresh
+            SpinWait spin = new SpinWait();
+            do {
+                first = m_first;
+                // aquire-fence
+                next = first.next;
+                if (next == null) {
+                    value = default(T);
+                    return false;
+                }
+                
+                if (first == Interlocked.CompareExchange(ref m_first, next, first))
+                    // head succesfully updated
+                    break;
+                spin.SpinOnce();
+            } while (true);
+
+            value = next.value;
+            return true;
+        }
+
+        #region IEnumerable implementation
+
+        class Enumerator : IEnumerator<T> {
+            Node m_current;
+            Node m_first;
+
+            public Enumerator(Node first) {
+                m_first = first;
+            }
+
+            #region IEnumerator implementation
+
+            public bool MoveNext() {
+                m_current = m_current == null ? m_first : m_current.next;
+                return m_current != null;
+            }
+
+            public void Reset() {
+                m_current = null;
+            }
+
+            object IEnumerator.Current {
+                get {
+                    if (m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current.value;
+                }
+            }
+
+            #endregion
+
+            #region IDisposable implementation
+
+            public void Dispose() {
+            }
+
+            #endregion
+
+            #region IEnumerator implementation
+
+            public T Current {
+                get {
+                    if (m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current.value;
+                }
+            }
+
+            #endregion
+        }
+
+        public IEnumerator<T> GetEnumerator() {
+            return new Enumerator(m_first);
+        }
+
+        #endregion
+
+        #region IEnumerable implementation
+
+        IEnumerator IEnumerable.GetEnumerator() {
+            return GetEnumerator();
+        }
+
+        #endregion
+    }
+}