changeset 24:ee04e1fa78da

fixed dispatch pool race condition
author cin
date Thu, 14 Nov 2013 01:15:07 +0400
parents f0568ff069a5
children 9bf5b23650c9
files Implab.Test/AsyncTests.cs Implab.v11.suo Implab/Parallels/ArrayTraits.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/MTQueue.cs
diffstat 5 files changed, 48 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Wed Nov 13 14:03:20 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Thu Nov 14 01:15:07 2013 +0400
@@ -244,7 +244,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(4,4,0)) {
+            using (var pool = new WorkerPool(0,100,0)) {
                 int count = 10000;
 
                 double[] args = new double[count];
@@ -255,7 +255,7 @@
 
                 var t = Environment.TickCount;
                 var res = args
-                    .ChainedMap(
+                    .ChainedMap2(
                         x => pool.Invoke(
                             () => Math.Sin(x * x)
                         ),
Binary file Implab.v11.suo has changed
--- a/Implab/Parallels/ArrayTraits.cs	Wed Nov 13 14:03:20 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Thu Nov 14 01:15:07 2013 +0400
@@ -167,5 +167,46 @@
 
             return promise.Anyway(() => semaphore.Dispose());
         }
+
+        /*
+        this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is
+        be chained, in this case the syncronous callback invocation will occur
+        
+        public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+            if (threads <= 0)
+                throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
+
+            var promise = new Promise<TDst[]>();
+            var res = new TDst[source.Length];
+            var index = -1; // we will start with increment
+            var len = source.Length;
+            var pending = len;
+
+            Action<int> callback = null;
+            callback = (current) => {
+                if (current < len) {
+                    transform(source[current])
+                        .Then(
+                            x => {
+                                res[current] = x;
+                                if (Interlocked.Decrement(ref pending) == 0)
+                                    promise.Resolve(res);
+                                else
+                                    callback(Interlocked.Increment(ref index));
+                            },
+                            e => promise.Reject(e)
+                        );
+                }
+            };
+
+            for (int i = 0; i < threads; i++)
+                callback(Interlocked.Increment(ref index));
+            return promise;
+        }
+         */
     }
 }
--- a/Implab/Parallels/DispatchPool.cs	Wed Nov 13 14:03:20 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Thu Nov 14 01:15:07 2013 +0400
@@ -151,7 +151,10 @@
                 }
             } else {
                 // if there is no sleeping threads in the pool
-                StartWorker();
+                if (!StartWorker())
+                    // we haven't started a new thread, but the current can be on the way and it can't process the queue
+                    // send it a signal to spin again
+                    SignalThread(); 
             }
         }
 
@@ -281,7 +284,6 @@
                         m_hasTasks.Dispose();
                     else
                         SignalThread(); // wake next worker
-                    unit = default(TUnit);
                     break;
                 }
 
--- a/Implab/Parallels/MTQueue.cs	Wed Nov 13 14:03:20 2013 +0400
+++ b/Implab/Parallels/MTQueue.cs	Thu Nov 14 01:15:07 2013 +0400
@@ -44,7 +44,7 @@
                     // this is the last element,
                     // then try to update the tail
                     if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
-                        // this is a ace condition
+                        // this is a race condition
                         if (m_last == null)
                             // the queue is empty
                             return false;