changeset 20:1c3b3d518480 promises

refactoring, sync
author cin
date Tue, 12 Nov 2013 02:27:22 +0400 (2013-11-11)
parents e3935fdf59a2
children 6a56df4ec59e
files Implab.Test/AsyncTests.cs Implab.v11.suo Implab/Parallels/DispatchPool.cs Implab/Parallels/WorkerPool.cs
diffstat 4 files changed, 87 insertions(+), 45 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Sun Nov 10 00:21:33 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Tue Nov 12 02:27:22 2013 +0400
@@ -101,18 +101,18 @@
         public void WorkerPoolSizeTest() {
             var pool = new WorkerPool(5, 10, 0);
 
-            Assert.AreEqual(5, pool.ThreadCount);
+            Assert.AreEqual(5, pool.PoolSize);
 
             pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
             pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
             pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
 
-            Assert.AreEqual(5, pool.ThreadCount);
+            Assert.AreEqual(5, pool.PoolSize);
 
             for (int i = 0; i < 100; i++)
                 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
             Thread.Sleep(100);
-            Assert.AreEqual(10, pool.ThreadCount);
+            Assert.AreEqual(10, pool.PoolSize);
 
             pool.Dispose();
         }
@@ -149,10 +149,10 @@
         [TestMethod]
         public void WorkerPoolDisposeTest() {
             var pool = new WorkerPool(5, 20);
-            Assert.AreEqual(5, pool.ThreadCount);
+            Assert.AreEqual(5, pool.PoolSize);
             pool.Dispose();
-            Thread.Sleep(100);
-            Assert.AreEqual(0, pool.ThreadCount);
+            Thread.Sleep(200);
+            Assert.AreEqual(0, pool.PoolSize);
             pool.Dispose();
         }
 
@@ -244,7 +244,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(4,4,0)) {
+            using (var pool = new WorkerPool(0,100,0)) {
                 int count = 10000;
 
                 double[] args = new double[count];
Binary file Implab.v11.suo has changed
--- a/Implab/Parallels/DispatchPool.cs	Sun Nov 10 00:21:33 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Tue Nov 12 02:27:22 2013 +0400
@@ -9,10 +9,12 @@
     public abstract class DispatchPool<TUnit> : IDisposable {
         readonly int m_minThreads;
         readonly int m_maxThreads;
-        int m_runningThreads = 0;
+        int m_createdThreads = 0;
+        int m_activeThreads = 0;
+        int m_sleepingThreads = 0;
         int m_maxRunningThreads = 0;
-        int m_suspended = 0;
         int m_exitRequired = 0;
+        int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit
         AutoResetEvent m_hasTasks = new AutoResetEvent(false);
 
         protected DispatchPool(int min, int max) {
@@ -44,9 +46,15 @@
                 StartWorker();
         }
 
-        public int ThreadCount {
+        public int PoolSize {
             get {
-                return m_runningThreads;
+                return m_createdThreads;
+            }
+        }
+
+        public int ActiveThreads {
+            get {
+                return m_activeThreads;
             }
         }
 
@@ -65,11 +73,18 @@
         protected abstract bool TryDequeue(out TUnit unit);
 
         protected virtual bool ExtendPool() {
-            if (m_suspended > 0) {
-                m_hasTasks.Set();
+            if (m_sleepingThreads == 0)
+                // no sleeping workers are available
+                // try create one
+                return StartWorker();
+            else {
+                // we can get here a race condition when several threads asks to extend pool
+                // and some sleaping threads are exited due timeout but they are still counted as sleeping
+                // in that case all of this threads could exit except one
+                WakePool();
                 return true;
-            } else
-                return StartWorker();
+            }
+
         }
 
         /// <summary>
@@ -79,14 +94,50 @@
             m_hasTasks.Set(); // wake sleeping thread;
 
             if (AllocateThreadSlot(1)) {
+                // if there were no threads in the pool
                 var worker = new Thread(this.Worker);
                 worker.IsBackground = true;
                 worker.Start();
             }
         }
 
-        protected virtual void Suspend() {
-            m_hasTasks.WaitOne();
+        bool Sleep(int timeout) {
+            Interlocked.Increment(ref m_sleepingThreads);
+            var result = m_hasTasks.WaitOne(timeout);
+            Interlocked.Decrement(ref m_sleepingThreads);
+            return result;
+        }
+
+        protected virtual bool Suspend() {
+            //no tasks left, exit if the thread is no longer needed
+            bool last;
+            bool requestExit;
+
+            if (m_releaseTimeout > 0)
+                requestExit = !Sleep(m_releaseTimeout);
+            else
+                requestExit = true;
+            
+
+            if (requestExit && ReleaseThreadSlot(out last)) {
+                // in case at the moment the last thread was being released
+                // a new task was added to the queue, we need to try
+                // to revoke the thread to avoid the situation when the task is left unprocessed
+                if (last && m_hasTasks.WaitOne(0)) {
+                    if (AllocateThreadSlot(1))
+                        return true; // spin again...
+                    else
+                        // we failed to reallocate the first slot for this thread
+                        // therefore we need to release the event
+                        m_hasTasks.Set();
+                }
+
+                return false;
+            }
+
+            Sleep(-1);
+
+            return true;
         }
 
         #region thread slots traits
@@ -95,11 +146,11 @@
             int current;
             // use spins to allocate slot for the new thread
             do {
-                current = m_runningThreads;
+                current = m_createdThreads;
                 if (current >= m_maxThreads || m_exitRequired != 0)
                     // no more slots left or the pool has been disposed
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
 
             UpdateMaxThreads(current + 1);
 
@@ -107,7 +158,7 @@
         }
 
         bool AllocateThreadSlot(int desired) {
-            if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
+            if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
                 return false;
 
             UpdateMaxThreads(desired);
@@ -120,11 +171,11 @@
             int current;
             // use spins to release slot for the new thread
             do {
-                current = m_runningThreads;
+                current = m_createdThreads;
                 if (current <= m_minThreads && m_exitRequired == 0)
                     // the thread is reserved
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
 
             last = (current == 1);
 
@@ -136,7 +187,7 @@
         /// </summary>
         /// <returns>true - no more threads left</returns>
         bool ReleaseThreadSlotAnyway() {
-            var left = Interlocked.Decrement(ref m_runningThreads);
+            var left = Interlocked.Decrement(ref m_createdThreads);
             return left == 0;
         }
 
@@ -169,6 +220,7 @@
                 // exit if requested
                 if (m_exitRequired != 0) {
                     // release the thread slot
+                    Interlocked.Decrement(ref m_activeThreads);
                     if (ReleaseThreadSlotAnyway()) // it was the last worker
                         m_hasTasks.Dispose();
                     else
@@ -183,26 +235,14 @@
                     return true;
                 }
 
-                //no tasks left, exit if the thread is no longer needed
-                bool last;
-                if (ReleaseThreadSlot(out last)) {
-                    if (last && m_hasTasks.WaitOne(0)) {
-                        if (AllocateThreadSlot(1))
-                            continue; // spin again...
-                        else
-                            // we failed to reallocate slot for this thread
-                            // therefore we need to release the event
-                            m_hasTasks.Set(); 
-                    }
-
-                    return false;
-                }
+                Interlocked.Decrement(ref m_activeThreads);
 
                 // entering suspend state
-                Interlocked.Increment(ref m_suspended);
                 // keep this thread and wait                
-                Suspend();
-                Interlocked.Decrement(ref m_suspended);
+                if (!Suspend())
+                    return false;
+
+                Interlocked.Increment(ref m_activeThreads);
             } while (true);
         }
 
@@ -210,6 +250,7 @@
 
         void Worker() {
             TUnit unit;
+            Interlocked.Increment(ref m_activeThreads);
             while (FetchTask(out unit))
                 InvokeUnit(unit);
         }
--- a/Implab/Parallels/WorkerPool.cs	Sun Nov 10 00:21:33 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Tue Nov 12 02:27:22 2013 +0400
@@ -57,12 +57,11 @@
             var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
 
-            if(!ExtendPool())
-                WakePool();
+            ExtendPool();
         }
 
         protected override bool ExtendPool() {
-            if (m_queueLength <= m_threshold*ThreadCount)
+            if (m_queueLength <= m_threshold*ActiveThreads)
                 // in this case we are in active thread and it request for additional workers
                 // satisfy it only when queue is longer than threshold
                 return false;
@@ -81,9 +80,11 @@
             unit();
         }
 
-        protected override void Suspend() {
+        protected override bool Suspend() {
             if (m_queueLength == 0)
-                base.Suspend();
+                return base.Suspend();
+            else
+                return true; // spin again without locks...
         }
     }
 }