changeset 21:6a56df4ec59e promises

DispatchPool works again, but performance is poor in some cases
author cin
date Tue, 12 Nov 2013 19:52:10 +0400 (2013-11-12)
parents 1c3b3d518480
children 5a35900264f5
files Implab.Test/AsyncTests.cs Implab.suo Implab/Parallels/DispatchPool.cs Implab/Parallels/WorkerPool.cs
diffstat 4 files changed, 80 insertions(+), 70 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Tue Nov 12 02:27:22 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Tue Nov 12 19:52:10 2013 +0400
@@ -151,7 +151,7 @@
             var pool = new WorkerPool(5, 20);
             Assert.AreEqual(5, pool.PoolSize);
             pool.Dispose();
-            Thread.Sleep(200);
+            Thread.Sleep(500);
             Assert.AreEqual(0, pool.PoolSize);
             pool.Dispose();
         }
@@ -244,7 +244,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(0,100,0)) {
+            using (var pool = new WorkerPool(0,100,1)) {
                 int count = 10000;
 
                 double[] args = new double[count];
Binary file Implab.suo has changed
--- a/Implab/Parallels/DispatchPool.cs	Tue Nov 12 02:27:22 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Tue Nov 12 19:52:10 2013 +0400
@@ -9,12 +9,15 @@
     public abstract class DispatchPool<TUnit> : IDisposable {
         readonly int m_minThreads;
         readonly int m_maxThreads;
-        int m_createdThreads = 0;
-        int m_activeThreads = 0;
-        int m_sleepingThreads = 0;
-        int m_maxRunningThreads = 0;
-        int m_exitRequired = 0;
-        int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit
+
+        int m_createdThreads = 0; // the current size of the pool
+        int m_activeThreads = 0; // the count of threads which are active
+        int m_sleepingThreads = 0; // the count of currently inactive threads
+        int m_maxRunningThreads = 0; // the meximum reached size of the pool
+        int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
+        int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
+        int m_wakeEvents = 0; // the count of wake events
+        
         AutoResetEvent m_hasTasks = new AutoResetEvent(false);
 
         protected DispatchPool(int min, int max) {
@@ -72,69 +75,89 @@
 
         protected abstract bool TryDequeue(out TUnit unit);
 
-        protected virtual bool ExtendPool() {
-            if (m_sleepingThreads == 0)
-                // no sleeping workers are available
-                // try create one
-                return StartWorker();
-            else {
-                // we can get here a race condition when several threads asks to extend pool
-                // and some sleaping threads are exited due timeout but they are still counted as sleeping
-                // in that case all of this threads could exit except one
-                WakePool();
+        #region thread execution traits
+        int SignalThread() {
+            var signals = Interlocked.Increment(ref m_wakeEvents);
+            if(signals == 1)
+                m_hasTasks.Set();
+            return signals;
+        }
+
+        bool Sleep(int timeout) {
+            Interlocked.Increment(ref m_sleepingThreads);
+            if (m_hasTasks.WaitOne(timeout)) {
+                // this is autoreset event, only one thread can run this block simultaneously
+                var sleeping = Interlocked.Decrement(ref m_sleepingThreads);
+                if (Interlocked.Decrement(ref m_wakeEvents) > 0)
+                    m_hasTasks.Set(); // wake next worker
+
                 return true;
+            } else {
+                Interlocked.Decrement(ref m_sleepingThreads);
+                return false;
             }
-
         }
+        #endregion
 
         /// <summary>
         /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
         /// </summary>
-        protected void WakePool() {
-            m_hasTasks.Set(); // wake sleeping thread;
+        protected void GrowPool() {
+            if (m_exitRequired != 0)
+                return;
+            if (m_sleepingThreads > m_wakeEvents) {
+                // all sleeping threads may gone
+                SignalThread(); // wake a sleeping thread;
 
-            if (AllocateThreadSlot(1)) {
-                // if there were no threads in the pool
-                var worker = new Thread(this.Worker);
-                worker.IsBackground = true;
-                worker.Start();
+                // we can't check whether signal has been processed
+                // anyway it may take some time for the thread to start
+                // we will ensure that at least one thread is running
+
+                if (AllocateThreadSlot(1)) {
+                    // if there were no threads in the pool
+                    var worker = new Thread(this.Worker);
+                    worker.IsBackground = true;
+                    worker.Start();
+                }
+            } else {
+                // if there is no sleeping threads in the pool
+                StartWorker();
             }
         }
 
-        bool Sleep(int timeout) {
-            Interlocked.Increment(ref m_sleepingThreads);
-            var result = m_hasTasks.WaitOne(timeout);
-            Interlocked.Decrement(ref m_sleepingThreads);
-            return result;
-        }
-
-        protected virtual bool Suspend() {
+        private bool Suspend() {
             //no tasks left, exit if the thread is no longer needed
             bool last;
             bool requestExit;
 
+            
+
+            // if threads have a timeout before releasing
             if (m_releaseTimeout > 0)
                 requestExit = !Sleep(m_releaseTimeout);
             else
                 requestExit = true;
-            
 
+            if (!requestExit)
+                return true;
+
+            // release unsused thread
             if (requestExit && ReleaseThreadSlot(out last)) {
                 // in case at the moment the last thread was being released
                 // a new task was added to the queue, we need to try
                 // to revoke the thread to avoid the situation when the task is left unprocessed
-                if (last && m_hasTasks.WaitOne(0)) {
+                if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
                     if (AllocateThreadSlot(1))
                         return true; // spin again...
                     else
-                        // we failed to reallocate the first slot for this thread
-                        // therefore we need to release the event
-                        m_hasTasks.Set();
+                        SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
+                    
                 }
 
                 return false;
             }
 
+            // wait till infinity
             Sleep(-1);
 
             return true;
@@ -215,7 +238,12 @@
             }
         }
 
-        bool FetchTask(out TUnit unit) {
+        protected abstract void InvokeUnit(TUnit unit);
+
+        void Worker() {
+            TUnit unit;
+            Interlocked.Increment(ref m_activeThreads);
+            Sleep(0); // remove wake request if the new thread is started
             do {
                 // exit if requested
                 if (m_exitRequired != 0) {
@@ -224,15 +252,15 @@
                     if (ReleaseThreadSlotAnyway()) // it was the last worker
                         m_hasTasks.Dispose();
                     else
-                        m_hasTasks.Set(); // wake next worker
+                        SignalThread(); // wake next worker
                     unit = default(TUnit);
-                    return false;
+                    break;
                 }
 
                 // fetch task
                 if (TryDequeue(out unit)) {
-                    ExtendPool();
-                    return true;
+                    InvokeUnit(unit);
+                    continue;
                 }
 
                 Interlocked.Decrement(ref m_activeThreads);
@@ -240,19 +268,11 @@
                 // entering suspend state
                 // keep this thread and wait                
                 if (!Suspend())
-                    return false;
+                    break;
 
                 Interlocked.Increment(ref m_activeThreads);
             } while (true);
-        }
-
-        protected abstract void InvokeUnit(TUnit unit);
-
-        void Worker() {
-            TUnit unit;
-            Interlocked.Increment(ref m_activeThreads);
-            while (FetchTask(out unit))
-                InvokeUnit(unit);
+               
         }
 
         protected virtual void Dispose(bool disposing) {
@@ -262,7 +282,10 @@
                         return;
 
                     // wake sleeping threads
-                    m_hasTasks.Set();
+                    if (m_createdThreads > 0)
+                        SignalThread();
+                    else
+                        m_hasTasks.Dispose();
                     GC.SuppressFinalize(this);
                 }
             }
--- a/Implab/Parallels/WorkerPool.cs	Tue Nov 12 02:27:22 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Tue Nov 12 19:52:10 2013 +0400
@@ -57,15 +57,8 @@
             var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
 
-            ExtendPool();
-        }
-
-        protected override bool ExtendPool() {
-            if (m_queueLength <= m_threshold*ActiveThreads)
-                // in this case we are in active thread and it request for additional workers
-                // satisfy it only when queue is longer than threshold
-                return false;
-            return base.ExtendPool();
+            if (len > m_threshold*ActiveThreads)
+                GrowPool();
         }
 
         protected override bool TryDequeue(out Action unit) {
@@ -80,11 +73,5 @@
             unit();
         }
 
-        protected override bool Suspend() {
-            if (m_queueLength == 0)
-                return base.Suspend();
-            else
-                return true; // spin again without locks...
-        }
     }
 }