Mercurial > pub > ImplabNet
changeset 233:d6fe09f5592c v2
Improved AsyncQueue
Removed ImplabFx
author | cin |
---|---|
date | Wed, 04 Oct 2017 15:44:47 +0300 |
parents | 5f7a3e1d32b9 |
children | 8dd666e6b6bf |
files | Implab.Playground/App.config Implab.Playground/Implab.Playground.csproj Implab.Playground/Program.cs Implab.Playground2.psess Implab.Test/AsyncTests.cs Implab.sln Implab/AbstractEvent.cs Implab/Automaton/AutomatonTransition.cs Implab/Implab.csproj Implab/Parallels/AsyncQueue.cs Implab/Parallels/BlockingQueue.cs Implab/Parallels/MTQueue.cs Implab/Parallels/SimpleAsyncQueue.cs |
diffstat | 13 files changed, 501 insertions(+), 431 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Playground/App.config Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab.Playground/App.config Wed Oct 04 15:44:47 2017 +0300 @@ -1,6 +1,6 @@ -<?xml version="1.0" encoding="utf-8" ?> +<?xml version="1.0" encoding="utf-8"?> <configuration> <startup> - <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.2" /> + <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5"/> </startup> -</configuration> \ No newline at end of file +</configuration>
--- a/Implab.Playground/Implab.Playground.csproj Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab.Playground/Implab.Playground.csproj Wed Oct 04 15:44:47 2017 +0300 @@ -9,9 +9,10 @@ <AppDesignerFolder>Properties</AppDesignerFolder> <RootNamespace>Implab.Playground</RootNamespace> <AssemblyName>Implab.Playground</AssemblyName> - <TargetFrameworkVersion>v4.5.2</TargetFrameworkVersion> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> <FileAlignment>512</FileAlignment> <AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects> + <TargetFrameworkProfile /> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <PlatformTarget>AnyCPU</PlatformTarget>
--- a/Implab.Playground/Program.cs Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab.Playground/Program.cs Wed Oct 04 15:44:47 2017 +0300 @@ -1,10 +1,13 @@ using Implab.Formats.Json; +using Implab.Parallels; using Implab.Xml; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using System.Xml; using System.Xml.Serialization; @@ -12,29 +15,167 @@ namespace Implab.Playground { public class Program { - [XmlRoot(Namespace = "XmlSimpleData")] - public class XmlSimpleModel { - [XmlElement] - public string Name { get; set; } + static void EnqueueRange<T>(ConcurrentQueue<T> q, T[] data, int offset, int len) { + for (var i = offset; i < offset + len; i++) + q.Enqueue(data[i]); + } + + static bool TryDequeueRange<T>(ConcurrentQueue<T> q,T[] buffer,int offset, int len, out int actual) { + actual = 0; + T res; + while(q.TryDequeue(out res)) { + buffer[offset + actual] = res; + actual++; + if (actual == len) + break; + } + return actual != 0; + } + + static void EnqueueRange<T>(SimpleAsyncQueue<T> q, T[] data, int offset, int len) { + for (var i = offset; i < offset + len; i++) + q.Enqueue(data[i]); + } - [XmlElement] - public int Order { get; set; } + static bool TryDequeueRange<T>(SimpleAsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) { + actual = 0; + T res; + while (q.TryDequeue(out res)) { + buffer[offset + actual] = res; + actual++; + if (actual == len) + break; + } + return actual != 0; + } - [XmlElement] - public string[] Items { get; set; } + /* + static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) { + for (var i = offset; i < offset + len; i++) + q.Enqueue(data[i]); + } + static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) { + actual = 0; + T res; + while (q.TryDequeue(out res)) { + buffer[offset + actual] = res; + actual++; + if (actual == len) + break; + } + return actual != 0; + } + */ + + static void EnqueueRange<T>(AsyncQueue<T> q, T[] data, int offset, int len) { + q.EnqueueRange(data, offset, len); } + static bool TryDequeueRange<T>(AsyncQueue<T> q, T[] buffer, int offset, int len, out int actual) { + return q.TryDequeueRange(buffer, offset, len, out actual); + } + + static void Main(string[] args) { - var model = new XmlSimpleModel { - Name = "Tablet", - Order = 10, - Items = new string[] { "z1", "z2", "z3" } - }; - var doc = SerializationHelpers.SerializeAsXmlDocument(model); - - var m2 = SerializationHelpers.DeserializeFromXmlNode<XmlSimpleModel>(doc.DocumentElement); + //var queue = new ConcurrentQueue<int>(); + var queue = new AsyncQueue<int>(); + //var queue = new SimpleAsyncQueue<int>(); + + const int wBatch = 32; + const long wCount = 1000000; + const long total = wBatch * wCount * 3; + + long r1 = 0, r2 = 0, r3 = 0; + const int rBatch = 1000; + long read = 0; + + var t1 = Environment.TickCount; + + AsyncPool.RunThread( + () => { + var buffer = new int[wBatch]; + for (int i = 0; i < wBatch; i++) + buffer[i] = 1; + + for (int i = 0; i < wCount; i++) + EnqueueRange(queue, buffer, 0, wBatch); + Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[wBatch]; + for (int i = 0; i < wBatch; i++) + buffer[i] = 1; + + for (int i = 0; i < wCount; i++) + EnqueueRange(queue, buffer, 0, wBatch); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[wBatch]; + for (int i = 0; i < wBatch; i++) + buffer[i] = 1; + + for (int i = 0; i < wCount; i++) + EnqueueRange(queue, buffer, 0, wBatch); + Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[rBatch]; + + while (read < total) { + int actual; + if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) { + for (int i = 0; i < actual; i++) + r1 += buffer[i]; + Interlocked.Add(ref read, actual); + } + } + + Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); + }/*, + () => { + var buffer = new int[rBatch]; + + while (read < total) { + int actual; + if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) { + for (int i = 0; i < actual; i++) + r2 += buffer[i]; + Interlocked.Add(ref read, actual); + } + } + + Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); + }*//*, + () => { + var buffer = new int[rBatch]; + + while (read < total) { + int actual; + if (TryDequeueRange(queue, buffer, 0, rBatch, out actual)) { + for (int i = 0; i < actual; i++) + r3 += buffer[i]; + Interlocked.Add(ref read, actual); + } + } + + Console.WriteLine("done reader #3: {0} ms", Environment.TickCount - t1); + }*/ + ) + .PromiseAll() + .Join(); + + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + r1, + r2, + r1 + r2 + r3, + total + ); Console.WriteLine("done"); }
--- a/Implab.Playground2.psess Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab.Playground2.psess Wed Oct 04 15:44:47 2017 +0300 @@ -48,8 +48,8 @@ <LaunchProject>true</LaunchProject> <OverrideProjectSettings>false</OverrideProjectSettings> <LaunchMethod>Executable</LaunchMethod> - <ExecutablePath>Implab.Playground\bin\Release\Implab.Playground.exe</ExecutablePath> - <StartupDirectory>Implab.Playground\bin\Release\</StartupDirectory> + <ExecutablePath>Implab.Playground\bin\Debug\Implab.Playground.exe</ExecutablePath> + <StartupDirectory>Implab.Playground\bin\Debug\</StartupDirectory> <Arguments> </Arguments> <NetAppHost>IIS</NetAppHost> @@ -67,9 +67,4 @@ <ProjName>Implab.Playground</ProjName> </ProjBinary> </Binaries> - <Launches> - <ProjBinary> - <Path>:PB:{100DFEB0-75BE-436F-ADDF-1F46EF433F46}|Implab.Playground\Implab.Playground.csproj</Path> - </ProjBinary> - </Launches> </VSPerformanceSession> \ No newline at end of file
--- a/Implab.Test/AsyncTests.cs Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab.Test/AsyncTests.cs Wed Oct 04 15:44:47 2017 +0300 @@ -222,9 +222,9 @@ [TestMethod] public void MTQueueTest() { - var queue = new MTQueue<int>(); + var queue = new SimpleAsyncQueue<int>(); int res; - + queue.Enqueue(10); Assert.IsTrue(queue.TryDequeue(out res)); Assert.AreEqual(10, res); @@ -242,8 +242,9 @@ int readers = 0; var stop = new ManualResetEvent(false); int total = 0; + var ticks = Environment.TickCount; - const int itemsPerWriter = 10000; + const int itemsPerWriter = 1000000; const int writersCount = 10; for (int i = 0; i < writersCount; i++) { @@ -278,7 +279,9 @@ stop.WaitOne(); - Assert.AreEqual(100000, total); + Console.WriteLine("{0} in {1}ms", total, Environment.TickCount - ticks); + + Assert.AreEqual(itemsPerWriter * writersCount, total); } [TestMethod] @@ -509,13 +512,12 @@ public void AsyncQueueDrainTest() { var queue = new AsyncQueue<int>(); - const int wBatch = 11; + const int wBatch = 32; const int wCount = 200000; const int total = wBatch * wCount * 3; const int summ = wBatch * wCount * 3; int r1 = 0, r2 = 0; - const int rBatch = 11; int read = 0; var t1 = Environment.TickCount; @@ -531,8 +533,12 @@ Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); }, () => { - for(int i =0; i < wCount * wBatch; i++) - queue.Enqueue(1); + var buffer = new int[wBatch]; + for (int i = 0; i < wBatch; i++) + buffer[i] = 1; + + for (int i = 0; i < wCount; i++) + queue.EnqueueRange(buffer, 0, wBatch); Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); }, () => { @@ -572,25 +578,34 @@ },*/ () => { var count = 0; - while(read < total) { + int emptyDrains = 0; + + while (read < total) { var buffer = queue.Drain(); - for(int i=0; i< buffer.Length; i++) + if (buffer.Count == 0) + emptyDrains++; + for(int i=0; i< buffer.Count; i++) r1 += buffer[i]; - Interlocked.Add(ref read, buffer.Length); - count += buffer.Length; + Interlocked.Add(ref read, buffer.Count); + count += buffer.Count; } - Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); + Console.WriteLine("done reader #1: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains); }, () => { - var count = 0; - while(read < total) { + var count = 0; + int emptyDrains = 0; + + while (read < total) { var buffer = queue.Drain(); - for(int i=0; i< buffer.Length; i++) + if (buffer.Count == 0) + emptyDrains++; + + for (int i=0; i< buffer.Count; i++) r2 += buffer[i]; - Interlocked.Add(ref read, buffer.Length); - count += buffer.Length; + Interlocked.Add(ref read, buffer.Count); + count += buffer.Count; } - Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); + Console.WriteLine("done reader #2: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains); } ) .PromiseAll()
--- a/Implab.sln Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab.sln Wed Oct 04 15:44:47 2017 +0300 @@ -14,8 +14,6 @@ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test", "Implab.Test\Implab.Test.csproj", "{63F92C0C-61BF-48C0-A377-8D67C3C661D0}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx", "Implab.Fx\Implab.Fx.csproj", "{06E706F8-6881-43EB-927E-FFC503AF6ABC}" -EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Format.Test", "Implab.Format.Test\Implab.Format.Test.csproj", "{4D364996-7ECD-4193-8F90-F223FFEA49DA}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Playground", "Implab.Playground\Implab.Playground.csproj", "{100DFEB0-75BE-436F-ADDF-1F46EF433F46}" @@ -47,14 +45,6 @@ {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.ActiveCfg = Release|Any CPU {63F92C0C-61BF-48C0-A377-8D67C3C661D0}.Release|Any CPU.Build.0 = Release|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.ActiveCfg = Debug 4.5|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug 4.5|Any CPU.Build.0 = Debug 4.5|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Debug|Any CPU.Build.0 = Debug|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.ActiveCfg = Release 4.5|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release 4.5|Any CPU.Build.0 = Release 4.5|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.ActiveCfg = Release|Any CPU - {06E706F8-6881-43EB-927E-FFC503AF6ABC}.Release|Any CPU.Build.0 = Release|Any CPU {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.ActiveCfg = Debug|Any CPU {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug 4.5|Any CPU.Build.0 = Debug|Any CPU {4D364996-7ECD-4193-8F90-F223FFEA49DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
--- a/Implab/AbstractEvent.cs Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab/AbstractEvent.cs Wed Oct 04 15:44:47 2017 +0300 @@ -24,13 +24,13 @@ //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; THandler[] m_handlers; - MTQueue<THandler> m_extraHandlers; + SimpleAsyncQueue<THandler> m_extraHandlers; int m_handlerPointer = -1; int m_handlersCommited; int m_cancelRequest; Exception m_cancelationReason; - MTQueue<Action<Exception>> m_cancelationHandlers; + SimpleAsyncQueue<Action<Exception>> m_cancelationHandlers; #region state managment @@ -182,7 +182,7 @@ } } else { if (slot == RESERVED_HANDLERS_COUNT) { - m_extraHandlers = new MTQueue<THandler>(); + m_extraHandlers = new SimpleAsyncQueue<THandler>(); } else { while (m_extraHandlers == null) Thread.MemoryBarrier(); @@ -245,7 +245,7 @@ handler(CancellationReason); if (m_cancelationHandlers == null) - Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null); + Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null); m_cancelationHandlers.Enqueue(handler);
--- a/Implab/Automaton/AutomatonTransition.cs Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab/Automaton/AutomatonTransition.cs Wed Oct 04 15:44:47 2017 +0300 @@ -1,7 +1,7 @@ using System; namespace Implab.Automaton { - public struct AutomatonTransition : IEquatable<AutomatonTransition> { + public class AutomatonTransition : IEquatable<AutomatonTransition> { public readonly int s1; public readonly int s2; public readonly int edge;
--- a/Implab/Implab.csproj Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab/Implab.csproj Wed Oct 04 15:44:47 2017 +0300 @@ -1,5 +1,5 @@ <?xml version="1.0" encoding="utf-8"?> -<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> +<Project DefaultTargets="Build" ToolsVersion="12.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> <PropertyGroup> <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> @@ -8,6 +8,7 @@ <RootNamespace>Implab</RootNamespace> <AssemblyName>Implab</AssemblyName> <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <TargetFrameworkProfile /> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <DebugSymbols>true</DebugSymbols> @@ -97,7 +98,7 @@ <Compile Include="ITaskController.cs" /> <Compile Include="Parallels\DispatchPool.cs" /> <Compile Include="Parallels\ArrayTraits.cs" /> - <Compile Include="Parallels\MTQueue.cs" /> + <Compile Include="Parallels\SimpleAsyncQueue.cs" /> <Compile Include="Parallels\WorkerPool.cs" /> <Compile Include="ProgressInitEventArgs.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
--- a/Implab/Parallels/AsyncQueue.cs Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Wed Oct 04 15:44:47 2017 +0300 @@ -7,11 +7,11 @@ namespace Implab.Parallels { public class AsyncQueue<T> : IEnumerable<T> { class Chunk { - public Chunk next; + public volatile Chunk next; - int m_low; - int m_hi; - int m_alloc; + volatile int m_low; + volatile int m_hi; + volatile int m_alloc; readonly int m_size; readonly T[] m_data; @@ -28,12 +28,15 @@ m_data[0] = value; } - public Chunk(int size, T[] data, int offset, int length, int alloc) { + public Chunk(int size, int allocated) { m_size = size; - m_hi = length; - m_alloc = alloc; + m_hi = allocated; + m_alloc = allocated; m_data = new T[size]; - Array.Copy(data, offset, m_data, 0, length); + } + + public void WriteData(T[] data, int offset, int dest, int length) { + Array.Copy(data, offset, m_data, dest, length); } public int Low { @@ -48,31 +51,36 @@ get { return m_size; } } - public bool TryEnqueue(T value, out bool extend) { - var alloc = Interlocked.Increment(ref m_alloc) - 1; - - if (alloc >= m_size) { - extend = alloc == m_size; - return false; - } - - extend = false; + public bool TryEnqueue(T value) { + int alloc; + do { + alloc = m_alloc; + if (alloc >= m_size) + return false; + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc)); + m_data[alloc] = value; - while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { + SpinWait spin = new SpinWait(); + // m_hi is volatile + while (alloc != m_hi) { // spin wait for commit + spin.SpinOnce(); } + m_hi = alloc + 1; + return true; } /// <summary> /// Prevents from allocating new space in the chunk and waits for all write operations to complete /// </summary> - public void Commit() { - var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); - - while (m_hi != actual) - Thread.MemoryBarrier(); + public void Seal() { + var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size); + SpinWait spin = new SpinWait(); + while (m_hi != actual) { + spin.SpinOnce(); + } } public bool TryDequeue(out T value, out bool recycle) { @@ -84,44 +92,38 @@ recycle = (low == m_size); return false; } - } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); + } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low)); - recycle = (low == m_size - 1); + recycle = (low + 1 == m_size); value = m_data[low]; return true; } - public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { - //int alloc; - //int allocSize; + public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) { + int alloc; + do { + alloc = m_alloc; + if (alloc >= m_size) { + enqueued = 0; + return false; + } else { + enqueued = Math.Min(length, m_size - alloc); + } + } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc)); + + Array.Copy(batch, offset, m_data, alloc, enqueued); - var alloc = Interlocked.Add(ref m_alloc, length) - length; - if (alloc > m_size) { - // the chunk is full and someone already - // creating the new one - enqueued = 0; // nothing was added - extend = false; // the caller shouldn't try to extend the queue - return false; // nothing was added + SpinWait spin = new SpinWait(); + while (alloc != m_hi) { + spin.SpinOnce(); } - enqueued = Math.Min(m_size - alloc, length); - extend = length > enqueued; - - if (enqueued == 0) - return false; - - - Array.Copy(batch, offset, m_data, alloc, enqueued); - - while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { - // spin wait for commit - } - + m_hi = alloc + enqueued; return true; } - public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { + public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) { int low, hi, batchSize; do { @@ -129,15 +131,14 @@ hi = m_hi; if (low >= hi) { dequeued = 0; - recycle = (low == m_size); // recycling could be restarted and we need to signal again + recycle = (low == m_size); return false; } batchSize = Math.Min(hi - low, length); - } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); + } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); - recycle = (low == m_size - batchSize); dequeued = batchSize; - + recycle = (low + batchSize == m_size); Array.Copy(m_data, low, buffer, offset, batchSize); return true; @@ -149,32 +150,33 @@ } public const int DEFAULT_CHUNK_SIZE = 32; - public const int MAX_CHUNK_SIZE = 262144; + public const int MAX_CHUNK_SIZE = 256; Chunk m_first; Chunk m_last; + public AsyncQueue() { + m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE); + } + /// <summary> /// Adds the specified value to the queue. /// </summary> /// <param name="value">Tha value which will be added to the queue.</param> - public virtual void Enqueue(T value) { + public void Enqueue(T value) { var last = m_last; - // spin wait to the new chunk - bool extend = true; - while (last == null || !last.TryEnqueue(value, out extend)) { + SpinWait spin = new SpinWait(); + while (!last.TryEnqueue(value)) { // try to extend queue - if (extend || last == null) { - var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); - if (EnqueueChunk(last, chunk)) - break; // success! exit! - last = m_last; + var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); + var t = Interlocked.CompareExchange(ref m_last, chunk, last); + if (t == last) { + last.next = chunk; + break; } else { - while (last == m_last) { - Thread.MemoryBarrier(); - } - last = m_last; + last = t; } + spin.SpinOnce(); } } @@ -184,67 +186,54 @@ /// <param name="data">The buffer which contains the data to be enqueued.</param> /// <param name="offset">The offset of the data in the buffer.</param> /// <param name="length">The size of the data to read from the buffer.</param> - public virtual void EnqueueRange(T[] data, int offset, int length) { + public void EnqueueRange(T[] data, int offset, int length) { if (data == null) throw new ArgumentNullException("data"); - if (length == 0) - return; if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > data.Length) throw new ArgumentOutOfRangeException("length"); - var last = m_last; + while (length > 0) { + var last = m_last; + int enqueued; - bool extend; - int enqueued; - - while (length > 0) { - extend = true; - if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { + if (last.TryEnqueueBatch(data, offset, length, out enqueued)) { length -= enqueued; offset += enqueued; } - if (extend) { - // there was no enough space in the chunk - // or there was no chunks in the queue + if (length > 0) { + // we have something to enqueue - while (length > 0) { - - var size = Math.Min(length, MAX_CHUNK_SIZE); + var tail = length % MAX_CHUNK_SIZE; - var chunk = new Chunk( - Math.Max(size, DEFAULT_CHUNK_SIZE), - data, - offset, - size, - length // length >= size - ); + var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail); + + if (last != Interlocked.CompareExchange(ref m_last, chunk, last)) + continue; // we wasn't able to catch the writer, roundtrip - if (!EnqueueChunk(last, chunk)) { - // looks like the queue has been updated then proceed from the beginning - last = m_last; - break; - } + // we are lucky + // we can exclusively write our batch, the other writers will continue their work + + length -= tail; - // we have successfully added the new chunk - last = chunk; - length -= size; - offset += size; - } - } else { - // we don't need to extend the queue, if we successfully enqueued data - if (length == 0) - break; - - // if we need to wait while someone is extending the queue - // spinwait - while (last == m_last) { - Thread.MemoryBarrier(); + + for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) { + var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE); + node.WriteData(data, offset, 0, MAX_CHUNK_SIZE); + offset += MAX_CHUNK_SIZE; + // fence last.next is volatile + last.next = node; + last = node; } - last = m_last; + if (tail > 0) + chunk.WriteData(data, offset, 0, tail); + + // fence last.next is volatile + last.next = chunk; + return; } } } @@ -256,26 +245,21 @@ /// <param name="value">The value of the dequeued element.</param> public bool TryDequeue(out T value) { var chunk = m_first; - bool recycle; - while (chunk != null) { + do { + bool recycle; var result = chunk.TryDequeue(out value, out recycle); - if (recycle) // this chunk is waste - RecycleFirstChunk(chunk); - else + if (recycle && chunk.next != null) { + // this chunk is waste + chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); + } else { return result; // this chunk is usable and returned actual result + } if (result) // this chunk is waste but the true result is always actual return true; - - // try again - chunk = m_first; - } - - // the queue is empty - value = default(T); - return false; + } while (true); } /// <summary> @@ -295,10 +279,9 @@ throw new ArgumentOutOfRangeException("length"); var chunk = m_first; - bool recycle; dequeued = 0; - while (chunk != null) { - + do { + bool recycle; int actual; if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { offset += actual; @@ -306,18 +289,16 @@ dequeued += actual; } - if (recycle) // this chunk is waste - RecycleFirstChunk(chunk); - else if (actual == 0) - break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) + if (recycle && chunk.next != null) { + // this chunk is waste + chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); + } else { + chunk = null; + } if (length == 0) return true; - - // we still may dequeue something - // try again - chunk = m_first; - } + } while (chunk != null); return dequeued != 0; } @@ -339,123 +320,81 @@ throw new ArgumentOutOfRangeException("length"); var chunk = m_first; - bool recycle; - dequeued = 0; - - while (chunk != null) { + do { + bool recycle; + chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle); - int actual; - if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { - dequeued = actual; + if (recycle && chunk.next != null) { + // this chunk is waste + chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); + } else { + chunk = null; } - if (recycle) // this chunk is waste - RecycleFirstChunk(chunk); - // if we have dequeued any data, then return if (dequeued != 0) return true; - // we still may dequeue something - // try again - chunk = m_first; - } + } while (chunk != null); return false; } - - bool EnqueueChunk(Chunk last, Chunk chunk) { - if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) - return false; - - if (last != null) - last.next = chunk; - else { - m_first = chunk; - } - return true; - } - - void RecycleFirstChunk(Chunk first) { - var next = first.next; - - if (first != Interlocked.CompareExchange(ref m_first, next, first)) - return; - - if (next == null) { - - if (first != Interlocked.CompareExchange(ref m_last, null, first)) { - - // race - // someone already updated the tail, restore the pointer to the queue head - m_first = first; - } - // the tail is updated - } - } + public void Clear() { // start the new queue var chunk = new Chunk(DEFAULT_CHUNK_SIZE); - do { - Thread.MemoryBarrier(); var first = m_first; - var last = m_last; - - if (last == null) // nothing to clear - return; - - if (first == null || (first.next == null && first != last)) // inconcistency + if (first.next == null && first != m_last) { continue; - - // here we will create inconsistency which will force others to spin - // and prevent from fetching. chunk.next = null - if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) - continue;// inconsistent - - m_last = chunk; - - return; - - } while(true); - } - - public T[] Drain() { - // start the new queue - var chunk = new Chunk(DEFAULT_CHUNK_SIZE); - - do { - Thread.MemoryBarrier(); - var first = m_first; - var last = m_last; - - if (last == null) - return new T[0]; - - if (first == null || (first.next == null && first != last)) - continue; + } // here we will create inconsistency which will force others to spin // and prevent from fetching. chunk.next = null if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) continue;// inconsistent - last = Interlocked.Exchange(ref m_last, chunk); + m_last = chunk; + return; + } while (true); + } + + public List<T> Drain() { + // start the new queue + var chunk = new Chunk(DEFAULT_CHUNK_SIZE); + + do { + var first = m_first; + // first.next is volatile + if (first.next == null) { + if (first != m_last) + continue; + else if (first.Hi == first.Low) + return new List<T>(); + } + + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + + var last = Interlocked.Exchange(ref m_last, chunk); return ReadChunks(first, last); - } while(true); + } while (true); } - - static T[] ReadChunks(Chunk chunk, object last) { + + static List<T> ReadChunks(Chunk chunk, object last) { var result = new List<T>(); - var buffer = new T[DEFAULT_CHUNK_SIZE]; + var buffer = new T[MAX_CHUNK_SIZE]; int actual; bool recycle; + SpinWait spin = new SpinWait(); while (chunk != null) { // ensure all write operations on the chunk are complete - chunk.Commit(); + chunk.Seal(); // we need to read the chunk using this way // since some client still may completing the dequeue @@ -467,12 +406,12 @@ chunk = null; } else { while (chunk.next == null) - Thread.MemoryBarrier(); + spin.SpinOnce(); chunk = chunk.next; } } - return result.ToArray(); + return result; } struct ArraySegmentCollection : ICollection<T> { @@ -501,7 +440,7 @@ } public void CopyTo(T[] array, int arrayIndex) { - Array.Copy(m_data,m_offset,array,arrayIndex, m_length); + Array.Copy(m_data, m_offset, array, arrayIndex, m_length); } public bool Remove(T item) {
--- a/Implab/Parallels/BlockingQueue.cs Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab/Parallels/BlockingQueue.cs Wed Oct 04 15:44:47 2017 +0300 @@ -5,13 +5,13 @@ public class BlockingQueue<T> : AsyncQueue<T> { readonly object m_lock = new object(); - public override void Enqueue(T value) { + public void EnqueuePulse(T value) { base.Enqueue(value); lock (m_lock) Monitor.Pulse(m_lock); } - public override void EnqueueRange(T[] data, int offset, int length) { + public void EnqueueRangePulse(T[] data, int offset, int length) { base.EnqueueRange(data, offset, length); if (length > 1) lock (m_lock)
--- a/Implab/Parallels/MTQueue.cs Tue Sep 12 19:07:42 2017 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,143 +0,0 @@ -using System.Threading; -using System.Collections.Generic; -using System; -using System.Collections; - -namespace Implab.Parallels { - public class MTQueue<T> : IEnumerable<T> { - class Node { - public Node(T value) { - this.value = value; - } - public readonly T value; - public Node next; - } - - Node m_first; - Node m_last; - - public void Enqueue(T value) { - Thread.MemoryBarrier(); - - var last = m_last; - var next = new Node(value); - - // Interlocaked.CompareExchange implies Thread.MemoryBarrier(); - // to ensure that the next node is completely constructed - while (last != Interlocked.CompareExchange(ref m_last, next, last)) - last = m_last; - - if (last != null) - last.next = next; - else - m_first = next; - } - - public bool TryDequeue(out T value) { - Node first; - Node next; - value = default(T); - - Thread.MemoryBarrier(); - do { - first = m_first; - if (first == null) - return false; - next = first.next; - if (next == null) { - // this is the last element, - // then try to update the tail - if (first != Interlocked.CompareExchange(ref m_last, null, first)) { - // this is the race condition - if (m_last == null) - // the queue is empty - return false; - // tail has been changed, we need to restart - continue; - } - - // tail succesfully updated and first.next will never be changed - // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null - // however the parallel writer may update the m_first since the m_last is null - - // so we need to fix inconsistency by setting m_first to null or if it has been - // updated by the writer already then we should just to give up - Interlocked.CompareExchange(ref m_first, null, first); - break; - - } - if (first == Interlocked.CompareExchange(ref m_first, next, first)) - // head succesfully updated - break; - } while (true); - - value = first.value; - return true; - } - - #region IEnumerable implementation - - class Enumerator : IEnumerator<T> { - Node m_current; - Node m_first; - - public Enumerator(Node first) { - m_first = first; - } - - #region IEnumerator implementation - - public bool MoveNext() { - m_current = m_current == null ? m_first : m_current.next; - return m_current != null; - } - - public void Reset() { - m_current = null; - } - - object IEnumerator.Current { - get { - if (m_current == null) - throw new InvalidOperationException(); - return m_current.value; - } - } - - #endregion - - #region IDisposable implementation - - public void Dispose() { - } - - #endregion - - #region IEnumerator implementation - - public T Current { - get { - if (m_current == null) - throw new InvalidOperationException(); - return m_current.value; - } - } - - #endregion - } - - public IEnumerator<T> GetEnumerator() { - return new Enumerator(m_first); - } - - #endregion - - #region IEnumerable implementation - - IEnumerator IEnumerable.GetEnumerator() { - return GetEnumerator(); - } - - #endregion - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/SimpleAsyncQueue.cs Wed Oct 04 15:44:47 2017 +0300 @@ -0,0 +1,131 @@ +using System.Threading; +using System.Collections.Generic; +using System; +using System.Collections; + +namespace Implab.Parallels { + public class SimpleAsyncQueue<T> : IEnumerable<T> { + class Node { + public Node(T value) { + this.value = value; + } + public readonly T value; + public volatile Node next; + } + + // the reader and the writer are mainteined completely independent, + // the reader can read next item when m_first.next is not null + // the writer creates the a new node, moves m_last to this node and + // only after that restores the reference from the previous node + // making available the reader to read the new node. + + Node m_first; // position on the node which is already read + Node m_last; // position on the node which is already written + + public SimpleAsyncQueue() { + m_first = m_last = new Node(default(T)); + } + + public void Enqueue(T value) { + var next = new Node(value); + + // Interlocaked.CompareExchange implies Thread.MemoryBarrier(); + // to ensure that the next node is completely constructed + var last = Interlocked.Exchange(ref m_last, next); + + // release-fence + last.next = next; + + } + + public bool TryDequeue(out T value) { + Node first; + Node next; + + Thread.MemoryBarrier(); // ensure m_first is fresh + SpinWait spin = new SpinWait(); + do { + first = m_first; + // aquire-fence + next = first.next; + if (next == null) { + value = default(T); + return false; + } + + if (first == Interlocked.CompareExchange(ref m_first, next, first)) + // head succesfully updated + break; + spin.SpinOnce(); + } while (true); + + value = next.value; + return true; + } + + #region IEnumerable implementation + + class Enumerator : IEnumerator<T> { + Node m_current; + Node m_first; + + public Enumerator(Node first) { + m_first = first; + } + + #region IEnumerator implementation + + public bool MoveNext() { + m_current = m_current == null ? m_first : m_current.next; + return m_current != null; + } + + public void Reset() { + m_current = null; + } + + object IEnumerator.Current { + get { + if (m_current == null) + throw new InvalidOperationException(); + return m_current.value; + } + } + + #endregion + + #region IDisposable implementation + + public void Dispose() { + } + + #endregion + + #region IEnumerator implementation + + public T Current { + get { + if (m_current == null) + throw new InvalidOperationException(); + return m_current.value; + } + } + + #endregion + } + + public IEnumerator<T> GetEnumerator() { + return new Enumerator(m_first); + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } +}