changeset 34:dabf79fde388

fixed race condition in DispatchPool
author cin
date Thu, 10 Apr 2014 04:20:25 +0400 (2014-04-10)
parents b255e4aeef17
children 2880242f987a 6498078ae368
files Implab.Test/AsyncTests.cs Implab.suo Implab/Parallels/DispatchPool.cs Implab/Parallels/WorkerPool.cs
diffstat 4 files changed, 19 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Thu Apr 10 02:39:29 2014 +0400
+++ b/Implab.Test/AsyncTests.cs	Thu Apr 10 04:20:25 2014 +0400
@@ -244,7 +244,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(0,100,0)) {
+            using (var pool = new WorkerPool(0,100,100)) {
                 int count = 10000;
 
                 double[] args = new double[count];
Binary file Implab.suo has changed
--- a/Implab/Parallels/DispatchPool.cs	Thu Apr 10 02:39:29 2014 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Thu Apr 10 04:20:25 2014 +0400
@@ -155,7 +155,7 @@
             }
         }
 
-        private void EnsurePoolIsAlive() {
+        protected void EnsurePoolIsAlive() {
             if (AllocateThreadSlot(1)) {
                 // if there were no threads in the pool
                 var worker = new Thread(this.Worker);
@@ -164,7 +164,7 @@
             }
         }
 
-        private bool Suspend() {
+        protected virtual bool Suspend() {
             //no tasks left, exit if the thread is no longer needed
             bool last;
             bool requestExit;
@@ -295,7 +295,6 @@
                     InvokeUnit(unit);
                     continue;
                 }
-
                 Interlocked.Decrement(ref m_activeThreads);
 
                 // entering suspend state
--- a/Implab/Parallels/WorkerPool.cs	Thu Apr 10 02:39:29 2014 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Thu Apr 10 04:20:25 2014 +0400
@@ -69,6 +69,22 @@
             return false;
         }
 
+        protected override bool Suspend() {
+            // This override solves race condition
+            // WORKER                   CLIENT
+            // ---------------------------------------
+            // TryDeque == false
+            //                          Enqueue(unit), queueLen++
+            //                          GrowPool? == NO
+            // ActiveThreads--
+            // Suspend
+            //    queueLength > 0
+            // continue
+            if (m_queueLength > 0)
+                return true;
+            return base.Suspend();
+        }
+
         protected override void InvokeUnit(Action unit) {
             unit();
         }