# HG changeset patch # User cin # Date 1384377307 -14400 # Node ID ee04e1fa78da52ee25bf6dbb4786c9a2403163ae # Parent f0568ff069a51973c64a7a6093b48c751a6cb198 fixed dispatch pool race condition diff -r f0568ff069a5 -r ee04e1fa78da Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Thu Nov 14 01:15:07 2013 +0400 @@ -244,7 +244,7 @@ [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(4,4,0)) { + using (var pool = new WorkerPool(0,100,0)) { int count = 10000; double[] args = new double[count]; @@ -255,7 +255,7 @@ var t = Environment.TickCount; var res = args - .ChainedMap( + .ChainedMap2( x => pool.Invoke( () => Math.Sin(x * x) ), diff -r f0568ff069a5 -r ee04e1fa78da Implab.v11.suo Binary file Implab.v11.suo has changed diff -r f0568ff069a5 -r ee04e1fa78da Implab/Parallels/ArrayTraits.cs --- a/Implab/Parallels/ArrayTraits.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Thu Nov 14 01:15:07 2013 +0400 @@ -167,5 +167,46 @@ return promise.Anyway(() => semaphore.Dispose()); } + + /* + this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is + be chained, in this case the syncronous callback invocation will occur + + public static Promise ChainedMap2(this TSrc[] source, ChainedOperation transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise(); + var res = new TDst[source.Length]; + var index = -1; // we will start with increment + var len = source.Length; + var pending = len; + + Action callback = null; + callback = (current) => { + if (current < len) { + transform(source[current]) + .Then( + x => { + res[current] = x; + if (Interlocked.Decrement(ref pending) == 0) + promise.Resolve(res); + else + callback(Interlocked.Increment(ref index)); + }, + e => promise.Reject(e) + ); + } + }; + + for (int i = 0; i < threads; i++) + callback(Interlocked.Increment(ref index)); + return promise; + } + */ } } diff -r f0568ff069a5 -r ee04e1fa78da Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Thu Nov 14 01:15:07 2013 +0400 @@ -151,7 +151,10 @@ } } else { // if there is no sleeping threads in the pool - StartWorker(); + if (!StartWorker()) + // we haven't started a new thread, but the current can be on the way and it can't process the queue + // send it a signal to spin again + SignalThread(); } } @@ -281,7 +284,6 @@ m_hasTasks.Dispose(); else SignalThread(); // wake next worker - unit = default(TUnit); break; } diff -r f0568ff069a5 -r ee04e1fa78da Implab/Parallels/MTQueue.cs --- a/Implab/Parallels/MTQueue.cs Wed Nov 13 14:03:20 2013 +0400 +++ b/Implab/Parallels/MTQueue.cs Thu Nov 14 01:15:07 2013 +0400 @@ -44,7 +44,7 @@ // this is the last element, // then try to update the tail if (first != Interlocked.CompareExchange(ref m_last, null, first)) { - // this is a ace condition + // this is a race condition if (m_last == null) // the queue is empty return false;