Mercurial > pub > ImplabNet
changeset 24:ee04e1fa78da
fixed dispatch pool race condition
author | cin |
---|---|
date | Thu, 14 Nov 2013 01:15:07 +0400 |
parents | f0568ff069a5 |
children | 9bf5b23650c9 |
files | Implab.Test/AsyncTests.cs Implab.v11.suo Implab/Parallels/ArrayTraits.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/MTQueue.cs |
diffstat | 5 files changed, 48 insertions(+), 5 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Thu Nov 14 01:15:07 2013 +0400 @@ -244,7 +244,7 @@ [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(4,4,0)) { + using (var pool = new WorkerPool(0,100,0)) { int count = 10000; double[] args = new double[count]; @@ -255,7 +255,7 @@ var t = Environment.TickCount; var res = args - .ChainedMap( + .ChainedMap2( x => pool.Invoke( () => Math.Sin(x * x) ),
--- a/Implab/Parallels/ArrayTraits.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Thu Nov 14 01:15:07 2013 +0400 @@ -167,5 +167,46 @@ return promise.Anyway(() => semaphore.Dispose()); } + + /* + this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is + be chained, in this case the syncronous callback invocation will occur + + public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise<TDst[]>(); + var res = new TDst[source.Length]; + var index = -1; // we will start with increment + var len = source.Length; + var pending = len; + + Action<int> callback = null; + callback = (current) => { + if (current < len) { + transform(source[current]) + .Then( + x => { + res[current] = x; + if (Interlocked.Decrement(ref pending) == 0) + promise.Resolve(res); + else + callback(Interlocked.Increment(ref index)); + }, + e => promise.Reject(e) + ); + } + }; + + for (int i = 0; i < threads; i++) + callback(Interlocked.Increment(ref index)); + return promise; + } + */ } }
--- a/Implab/Parallels/DispatchPool.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Thu Nov 14 01:15:07 2013 +0400 @@ -151,7 +151,10 @@ } } else { // if there is no sleeping threads in the pool - StartWorker(); + if (!StartWorker()) + // we haven't started a new thread, but the current can be on the way and it can't process the queue + // send it a signal to spin again + SignalThread(); } } @@ -281,7 +284,6 @@ m_hasTasks.Dispose(); else SignalThread(); // wake next worker - unit = default(TUnit); break; }
--- a/Implab/Parallels/MTQueue.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab/Parallels/MTQueue.cs Thu Nov 14 01:15:07 2013 +0400 @@ -44,7 +44,7 @@ // this is the last element, // then try to update the tail if (first != Interlocked.CompareExchange(ref m_last, null, first)) { - // this is a ace condition + // this is a race condition if (m_last == null) // the queue is empty return false;