changeset 81:2c5631b43c7d v2

dispatch pool rewritten
author cin
date Fri, 26 Sep 2014 20:44:01 +0400
parents 4f20870d0816
children 0363407ee75c
files Implab.Test/AsyncTests.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/WorkerPool.cs
diffstat 3 files changed, 60 insertions(+), 216 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Fri Sep 26 03:32:34 2014 +0400
+++ b/Implab.Test/AsyncTests.cs	Fri Sep 26 20:44:01 2014 +0400
@@ -148,7 +148,7 @@
 
         [TestMethod]
         public void WorkerPoolSizeTest() {
-            var pool = new WorkerPool(5, 10, 0);
+            var pool = new WorkerPool(5, 10, 1);
 
             Assert.AreEqual(5, pool.PoolSize);
 
@@ -291,7 +291,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(0,10,100)) {
+            using (var pool = new WorkerPool(0,10,1)) {
                 const int count = 10000;
 
                 var args = new double[count];
--- a/Implab/Parallels/DispatchPool.cs	Fri Sep 26 03:32:34 2014 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Fri Sep 26 20:44:01 2014 +0400
@@ -7,19 +7,15 @@
 
 namespace Implab.Parallels {
     public abstract class DispatchPool<TUnit> : IDisposable {
-        readonly int m_minThreads;
-        readonly int m_maxThreads;
-        readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
+        readonly int m_minThreadsLimit;
+        readonly int m_maxThreadsLimit;
+        readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
 
-        int m_createdThreads = 0; // the current size of the pool
-        int m_activeThreads = 0; // the count of threads which are active
-        int m_sleepingThreads = 0; // the count of currently inactive threads
+        int m_threads = 0; // the current size of the pool
         int m_maxRunningThreads = 0; // the meximum reached size of the pool
-        int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
+        int m_exit = 0; // the pool is going to shutdown, all unused workers are released
 
-        int m_wakeEvents = 0; // the count of wake events
-        
-        readonly object m_signalLocker = new object();
+        readonly object m_signal = new object(); // used to pulse waiting threads
 
         protected DispatchPool(int min, int max) {
             if (min < 0)
@@ -29,8 +25,8 @@
 
             if (min > max)
                 min = max;
-            m_minThreads = min;
-            m_maxThreads = max;
+            m_minThreadsLimit = min;
+            m_maxThreadsLimit = max;
         }
 
         protected DispatchPool(int threads)
@@ -41,29 +37,22 @@
             int maxThreads, maxCP;
             ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
 
-            m_minThreads = 0;
-            m_maxThreads = maxThreads;
+            m_minThreadsLimit = 0;
+            m_maxThreadsLimit = maxThreads;
         }
 
         protected void InitPool() {
-            for (int i = 0; i < m_minThreads; i++)
+            for (int i = 0; i < m_minThreadsLimit; i++)
                 StartWorker();
         }
 
         public int PoolSize {
             get {
                 Thread.MemoryBarrier();
-                return m_createdThreads;
+                return m_threads;
             }
         }
-
-        public int ActiveThreads {
-            get {
-                Thread.MemoryBarrier();
-                return m_activeThreads;
-            }
-        }
-
+            
         public int MaxRunningThreads {
             get {
                 Thread.MemoryBarrier();
@@ -74,150 +63,47 @@
         protected bool IsDisposed {
             get {
                 Thread.MemoryBarrier();
-                return m_exitRequired == 1;
+                return m_exit == 1;
             }
         }
 
         protected abstract bool TryDequeue(out TUnit unit);
 
-        #region thread signaling traits
-        int SignalThread() {
-            var signals = Interlocked.Increment(ref m_wakeEvents);
-            if(signals == 1)
-                lock(m_signalLocker)
-                    Monitor.Pulse(m_signalLocker);
-            return signals;
-        }
-
-        bool FetchSignalOrWait(int timeout) {
-            var start = Environment.TickCount;
-            int signals;
-            Thread.MemoryBarrier(); // m_wakeEvents volatile first read
-            do {
-                signals = m_wakeEvents;
-                if (signals == 0)
-                    break;
-            } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
-
-            if (signals == 0) {
-                // no signal is fetched
-                lock(m_signalLocker) {
-                    while(m_wakeEvents == 0) {
-                        if (timeout != -1)
-                            timeout = Math.Max(0, timeout - (Environment.TickCount - start));
-                        if(!Monitor.Wait(m_signalLocker,timeout))
-                            return false; // timeout
+        private bool Dequeue(out TUnit unit, int timeout) {
+            int ts = Environment.TickCount;
+            if (TryDequeue(out unit))
+                return true;
+            lock (m_signal) {
+                while (!TryDequeue(out unit) && m_exit == 0)
+                    if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
+                        // timeout
+                        return false;
                     }
-                    // m_wakeEvents > 0
-                    if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
-                        Monitor.Pulse(m_signalLocker);
-
-                    // signal fetched
-                    return true;
-                }
-                
-            } else {
-                // signal fetched
-                return true;
+                // queue item or terminate
+                Monitor.Pulse(m_signal);
+                if (m_exit == 1)
+                    return false;
             }
-
-
+            return true;
         }
 
-        bool Sleep(int timeout) {
-            Interlocked.Increment(ref m_sleepingThreads);
-            if (FetchSignalOrWait(timeout)) {
-                Interlocked.Decrement(ref m_sleepingThreads);
-                return true;
-            } else {
-                Interlocked.Decrement(ref m_sleepingThreads);
-                return false;
-            }
-        }
-        #endregion
-
-        /// <summary>
-        /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
-        /// </summary>
-        protected void GrowPool() {
-            Thread.MemoryBarrier();
-            if (m_exitRequired == 1)
-                return;
-            if (m_sleepingThreads > m_wakeEvents) {
-                //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
-
-                // all sleeping threads may gone
-                SignalThread(); // wake a sleeping thread;
-
-                // we can't check whether signal has been processed
-                // anyway it may take some time for the thread to start
-                // we will ensure that at least one thread is running
-
-                EnsurePoolIsAlive();
-            } else {
-                // if there is no sleeping threads in the pool
-                if (!StartWorker()) {
-                    // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
-                    // send it a signal to spin again
-                    SignalThread();
-                    EnsurePoolIsAlive();
-                }
+        protected void SignalThread() {
+            lock (m_signal) {
+                Monitor.Pulse(m_signal);
             }
         }
 
-        protected void EnsurePoolIsAlive() {
-            if (AllocateThreadSlot(1)) {
-                // if there were no threads in the pool
-                var worker = new Thread(this.Worker);
-                worker.IsBackground = true;
-                worker.Start();
-            }
-        }
-
-        protected virtual bool Suspend() {
-            //no tasks left, exit if the thread is no longer needed
-            bool last;
-            bool requestExit;
-
-            // if threads have a timeout before releasing
-            if (m_releaseTimeout > 0)
-                requestExit = !Sleep(m_releaseTimeout);
-            else
-                requestExit = true;
-
-            if (!requestExit)
-                return true;
-
-            // release unsused thread
-            if (requestExit && ReleaseThreadSlot(out last)) {
-                // in case at the moment the last thread was being released
-                // a new task was added to the queue, we need to try
-                // to revoke the thread to avoid the situation when the task is left unprocessed
-                if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
-                    SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
-                    return AllocateThreadSlot(1); // ensure that at least one thread is alive
-                }
-
-                return false;
-            }
-
-            // wait till infinity
-            Sleep(-1);
-
-            return true;
-        }
-
         #region thread slots traits
 
         bool AllocateThreadSlot() {
             int current;
             // use spins to allocate slot for the new thread
             do {
-                current = m_createdThreads;
-                if (current >= m_maxThreads || m_exitRequired == 1)
+                current = m_threads;
+                if (current >= m_maxThreadsLimit || m_exit == 1)
                     // no more slots left or the pool has been disposed
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
 
             UpdateMaxThreads(current + 1);
 
@@ -225,7 +111,7 @@
         }
 
         bool AllocateThreadSlot(int desired) {
-            if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
+            if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
                 return false;
 
             UpdateMaxThreads(desired);
@@ -239,26 +125,17 @@
             // use spins to release slot for the new thread
             Thread.MemoryBarrier();
             do {
-                current = m_createdThreads;
-                if (current <= m_minThreads && m_exitRequired == 0)
+                current = m_threads;
+                if (current <= m_minThreadsLimit && m_exit == 0)
                     // the thread is reserved
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
 
             last = (current == 1);
 
             return true;
         }
 
-        /// <summary>
-        /// releases thread slot unconditionally, used during cleanup
-        /// </summary>
-        /// <returns>true - no more threads left</returns>
-        bool ReleaseThreadSlotAnyway() {
-            var left = Interlocked.Decrement(ref m_createdThreads);
-            return left == 0;
-        }
-
         void UpdateMaxThreads(int count) {
             int max;
             do {
@@ -270,12 +147,11 @@
 
         #endregion
 
-        bool StartWorker() {
+        protected bool StartWorker() {
             if (AllocateThreadSlot()) {
                 // slot successfully allocated
                 var worker = new Thread(this.Worker);
                 worker.IsBackground = true;
-                Interlocked.Increment(ref m_activeThreads);
                 worker.Start();
 
                 return true;
@@ -288,45 +164,30 @@
 
         protected virtual void Worker() {
             TUnit unit;
-            //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
-            int count = 0;;
-            Thread.MemoryBarrier();
+            bool last;
             do {
-                // exit if requested
-                if (m_exitRequired == 1) {
-                    // release the thread slot
-                    Interlocked.Decrement(ref m_activeThreads);
-                    if (!ReleaseThreadSlotAnyway()) // it was the last worker
-                        SignalThread(); // wake next worker
-                    break;
-                }
-
-                // fetch task
-                if (TryDequeue(out unit)) {
+                while (Dequeue(out unit, m_releaseTimeout)) {
                     InvokeUnit(unit);
-                    count ++;
+                }
+                if(!ReleaseThreadSlot(out last))
                     continue;
+                // queue may be not empty
+                if (last && TryDequeue(out unit)) {
+                    InvokeUnit(unit);
+                    if (AllocateThreadSlot(1))
+                        continue;
+                    // we can safely exit since pool is alive
                 }
-                Interlocked.Decrement(ref m_activeThreads);
+                break;
+            } while(true);
+        }
 
-                Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
-                // entering suspend state
-                // keep this thread and wait                
-                if (!Suspend())
-                    break;
-                count = 0;
-                //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
-                Interlocked.Increment(ref m_activeThreads);
-            } while (true);
-            //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
-        }
 
         protected virtual void Dispose(bool disposing) {
             if (disposing) {
-                if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
+                if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
                     // wake sleeping threads
-                    if (m_createdThreads > 0)
-                        SignalThread();
+                    SignalThread();
                     GC.SuppressFinalize(this);
                 }
             }
--- a/Implab/Parallels/WorkerPool.cs	Fri Sep 26 03:32:34 2014 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Fri Sep 26 20:44:01 2014 +0400
@@ -66,10 +66,11 @@
             var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
 
-            if (len > m_threshold * m_workers) {
-                Interlocked.Increment(ref m_workers);
-                GrowPool();
+            if (len > m_threshold * PoolSize) {
+                StartWorker();
             }
+
+            SignalThread();
         }
 
         protected override bool TryDequeue(out Action unit) {
@@ -80,24 +81,6 @@
             return false;
         }
 
-        protected override bool Suspend() {
-            // This override solves race condition
-            // WORKER                   CLIENT
-            // ---------------------------------------
-            // TryDeque == false
-            //                          Enqueue(unit), queueLen++
-            //                          GrowPool? == NO
-            // ActiveThreads--
-            // Suspend
-            //    queueLength > 0
-            // continue
-            Thread.MemoryBarrier();
-            if (m_queueLength > 0)
-                return true;
-            Interlocked.Decrement(ref m_workers);
-            return base.Suspend();
-        }
-
         protected override void InvokeUnit(Action unit) {
             unit();
         }