Mercurial > pub > ImplabNet
changeset 120:f1b897999260 v2
improved asyncpool usability
working on batch operations on asyncqueue
author | cin |
---|---|
date | Mon, 12 Jan 2015 05:19:52 +0300 |
parents | 2573b562e328 |
children | 62d2f1e98c4e |
files | Implab/Parallels/AsyncPool.cs Implab/Parallels/AsyncQueue.cs MonoPlay/Program.cs |
diffstat | 3 files changed, 66 insertions(+), 30 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab/Parallels/AsyncPool.cs Sun Jan 11 19:13:02 2015 +0300 +++ b/Implab/Parallels/AsyncPool.cs Mon Jan 12 05:19:52 2015 +0300 @@ -1,6 +1,7 @@ using Implab.Diagnostics; using System; using System.Threading; +using System.Linq; namespace Implab.Parallels { /// <summary> @@ -73,5 +74,13 @@ return p; } + + public static IPromise[] ThreadRun(params Action[] func) { + return func.Select(f => InvokeNewThread(f)).ToArray(); + } + + public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) { + return func.Select(f => InvokeNewThread(f)).ToArray(); + } } }
--- a/Implab/Parallels/AsyncQueue.cs Sun Jan 11 19:13:02 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Mon Jan 12 05:19:52 2015 +0300 @@ -36,19 +36,14 @@ } public bool TryEnqueue(T value,out bool extend) { - extend = false; - int alloc; - do { - alloc = m_alloc; - if (alloc > m_size) - return false; - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); + var alloc = Interlocked.Increment(ref m_alloc) - 1; - if (alloc == m_size) { - extend = true; + if (alloc >= m_size) { + extend = alloc == m_size; return false; } - + + extend = false; m_data[alloc] = value; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { @@ -74,6 +69,38 @@ return true; } + public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { + int alloc; + int allocSize; + + do { + alloc = m_alloc; + + if (alloc > m_size) { + enqueued = 0; + extend = false; + return false; + } + + allocSize = Math.Min(m_size - m_alloc, length); + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); + + if (alloc == m_size) { + enqueued = 0; + extend = true; + return false; + } + + Array.Copy(batch, offset, m_data, alloc, allocSize); + enqueued = allocSize; + extend = false; + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { + // spin wait for commit + } + return true; + } + public T GetAt(int pos) { return m_data[pos]; }
--- a/MonoPlay/Program.cs Sun Jan 11 19:13:02 2015 +0300 +++ b/MonoPlay/Program.cs Mon Jan 12 05:19:52 2015 +0300 @@ -19,30 +19,30 @@ var t1 = Environment.TickCount; - new [] { - AsyncPool.InvokeNewThread(() => { + AsyncPool.ThreadRun( + () => { for (var i = 0; i < count; i++) q1.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (var i = 0; i < count; i++) q1.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { int temp = 0; int i = 0; while (i < count) if (q1.TryDequeue(out temp)) i++; - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { int temp = 0; int i = 0; while (i < count) if (q1.TryDequeue(out temp)) i++; - }) - } + } + ) .Combine() .Join(); @@ -65,18 +65,18 @@ t1 = Environment.TickCount; - new [] { - AsyncPool.InvokeNewThread(() => { + AsyncPool.ThreadRun( + () => { for (var i = 0; i < count; i++) lock (q2) q2.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (var i = 0; i < count; i++) lock (q2) q2.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (int i = 0; i < count ;) lock (q2) { if (q2.Count == 0) @@ -85,8 +85,8 @@ i++; } - }), - AsyncPool.InvokeNewThread(() => { + }, + () => { for (int i = 0; i < count ;) lock (q2) { if (q2.Count == 0) @@ -95,8 +95,8 @@ i++; } - }) - } + } + ) .Combine() .Join();