diff Implab/Parallels/DispatchPool.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents f803565868a4
children
line wrap: on
line diff
--- a/Implab/Parallels/DispatchPool.cs	Wed Sep 03 18:34:02 2014 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Fri Apr 22 13:10:34 2016 +0300
@@ -1,24 +1,17 @@
 using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
 using System.Threading;
-using System.Diagnostics;
 
 namespace Implab.Parallels {
     public abstract class DispatchPool<TUnit> : IDisposable {
-        readonly int m_minThreads;
-        readonly int m_maxThreads;
+        readonly int m_minThreadsLimit;
+        readonly int m_maxThreadsLimit;
+        readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
 
-        int m_createdThreads = 0; // the current size of the pool
-        int m_activeThreads = 0; // the count of threads which are active
-        int m_sleepingThreads = 0; // the count of currently inactive threads
-        int m_maxRunningThreads = 0; // the meximum reached size of the pool
-        int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
-        int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
-        int m_wakeEvents = 0; // the count of wake events
-        
-        AutoResetEvent m_hasTasks = new AutoResetEvent(false);
+        int m_threads; // the current size of the pool
+        int m_maxRunningThreads; // the meximum reached size of the pool
+        int m_exit; // the pool is going to shutdown, all unused workers are released
+
+        readonly object m_signal = new object(); // used to pulse waiting threads
 
         protected DispatchPool(int min, int max) {
             if (min < 0)
@@ -28,8 +21,8 @@
 
             if (min > max)
                 min = max;
-            m_minThreads = min;
-            m_maxThreads = max;
+            m_minThreadsLimit = min;
+            m_maxThreadsLimit = max;
         }
 
         protected DispatchPool(int threads)
@@ -37,177 +30,74 @@
         }
 
         protected DispatchPool() {
-            int maxThreads, maxCP;
-            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
 
-            m_minThreads = 0;
-            m_maxThreads = maxThreads;
+            m_minThreadsLimit = 0;
+            m_maxThreadsLimit = Environment.ProcessorCount;
         }
 
         protected void InitPool() {
-            for (int i = 0; i < m_minThreads; i++)
+            for (int i = 0; i < m_minThreadsLimit; i++)
                 StartWorker();
         }
 
         public int PoolSize {
             get {
-                return m_createdThreads;
+                Thread.MemoryBarrier();
+                return m_threads;
             }
         }
-
-        public int ActiveThreads {
-            get {
-                return m_activeThreads;
-            }
-        }
-
+            
         public int MaxRunningThreads {
             get {
+                Thread.MemoryBarrier();
                 return m_maxRunningThreads;
             }
         }
 
         protected bool IsDisposed {
             get {
-                return m_exitRequired != 0;
+                Thread.MemoryBarrier();
+                return m_exit == 1;
             }
         }
 
         protected abstract bool TryDequeue(out TUnit unit);
 
-        #region thread execution traits
-        int SignalThread() {
-            var signals = Interlocked.Increment(ref m_wakeEvents);
-            if(signals == 1)
-                m_hasTasks.Set();
-            return signals;
-        }
-
-        bool FetchSignalOrWait(int timeout) {
-            var start = Environment.TickCount;
-
-            // означает, что поток владеет блокировкой и при успешном получении сигнала должен
-            // ее вернуть, чтобы другой ожидающий поток смог 
-            bool hasLock = false;
-            do {
-                int signals;
-                do {
-                    signals = m_wakeEvents;
-                    if (signals == 0)
-                        break;
-                } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
-
-                if (signals >= 1) {
-                    if (signals > 1 && hasLock)
-                        m_hasTasks.Set();
-                    return true;
-                }
-                
-                if (timeout != -1)
-                    timeout = Math.Max(0, timeout - (Environment.TickCount - start));
-
-                // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
-                // и уйдет на пустой цикл, после чего заблокируется
-
-                hasLock = true; 
-            } while (m_hasTasks.WaitOne(timeout));
-
-            return false;
+        bool Dequeue(out TUnit unit, int timeout) {
+            int ts = Environment.TickCount;
+            if (TryDequeue(out unit))
+                return true;
+            lock (m_signal) {
+                while (!TryDequeue(out unit) && m_exit == 0)
+                    if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
+                        // timeout
+                        return false;
+                    }
+                // queue item or terminate
+                Monitor.Pulse(m_signal);
+                if (m_exit == 1)
+                    return false;
+            }
+            return true;
         }
 
-        bool Sleep(int timeout) {
-            Interlocked.Increment(ref m_sleepingThreads);
-            if (FetchSignalOrWait(timeout)) {
-                Interlocked.Decrement(ref m_sleepingThreads);
-                return true;
-            } else {
-                Interlocked.Decrement(ref m_sleepingThreads);
-                return false;
-            }
-        }
-        #endregion
-
-        /// <summary>
-        /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
-        /// </summary>
-        protected void GrowPool() {
-            if (m_exitRequired != 0)
-                return;
-            if (m_sleepingThreads > m_wakeEvents) {
-                //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
-
-                // all sleeping threads may gone
-                SignalThread(); // wake a sleeping thread;
-
-                // we can't check whether signal has been processed
-                // anyway it may take some time for the thread to start
-                // we will ensure that at least one thread is running
-
-                EnsurePoolIsAlive();
-            } else {
-                // if there is no sleeping threads in the pool
-                if (!StartWorker()) {
-                    // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
-                    // send it a signal to spin again
-                    SignalThread();
-                    EnsurePoolIsAlive();
-                }
+        protected void SignalThread() {
+            lock (m_signal) {
+                Monitor.Pulse(m_signal);
             }
         }
 
-        protected void EnsurePoolIsAlive() {
-            if (AllocateThreadSlot(1)) {
-                // if there were no threads in the pool
-                var worker = new Thread(this.Worker);
-                worker.IsBackground = true;
-                worker.Start();
-            }
-        }
-
-        protected virtual bool Suspend() {
-            //no tasks left, exit if the thread is no longer needed
-            bool last;
-            bool requestExit;
-
-            // if threads have a timeout before releasing
-            if (m_releaseTimeout > 0)
-                requestExit = !Sleep(m_releaseTimeout);
-            else
-                requestExit = true;
-
-            if (!requestExit)
-                return true;
-
-            // release unsused thread
-            if (requestExit && ReleaseThreadSlot(out last)) {
-                // in case at the moment the last thread was being released
-                // a new task was added to the queue, we need to try
-                // to revoke the thread to avoid the situation when the task is left unprocessed
-                if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
-                    SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
-                    return AllocateThreadSlot(1); // ensure that at least one thread is alive
-                }
-
-                return false;
-            }
-
-            // wait till infinity
-            Sleep(-1);
-
-            return true;
-        }
-
         #region thread slots traits
 
         bool AllocateThreadSlot() {
             int current;
             // use spins to allocate slot for the new thread
             do {
-                current = m_createdThreads;
-                if (current >= m_maxThreads || m_exitRequired != 0)
+                current = m_threads;
+                if (current >= m_maxThreadsLimit || m_exit == 1)
                     // no more slots left or the pool has been disposed
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
 
             UpdateMaxThreads(current + 1);
 
@@ -215,7 +105,7 @@
         }
 
         bool AllocateThreadSlot(int desired) {
-            if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
+            if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
                 return false;
 
             UpdateMaxThreads(desired);
@@ -227,27 +117,19 @@
             last = false;
             int current;
             // use spins to release slot for the new thread
+            Thread.MemoryBarrier();
             do {
-                current = m_createdThreads;
-                if (current <= m_minThreads && m_exitRequired == 0)
+                current = m_threads;
+                if (current <= m_minThreadsLimit && m_exit == 0)
                     // the thread is reserved
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
 
             last = (current == 1);
 
             return true;
         }
 
-        /// <summary>
-        /// releases thread slot unconditionally, used during cleanup
-        /// </summary>
-        /// <returns>true - no more threads left</returns>
-        bool ReleaseThreadSlotAnyway() {
-            var left = Interlocked.Decrement(ref m_createdThreads);
-            return left == 0;
-        }
-
         void UpdateMaxThreads(int count) {
             int max;
             do {
@@ -259,65 +141,46 @@
 
         #endregion
 
-        bool StartWorker() {
+        protected bool StartWorker() {
             if (AllocateThreadSlot()) {
                 // slot successfully allocated
-                var worker = new Thread(this.Worker);
+                var worker = new Thread(Worker);
                 worker.IsBackground = true;
                 worker.Start();
 
                 return true;
-            } else {
-                return false;
             }
+            return false;
         }
 
         protected abstract void InvokeUnit(TUnit unit);
 
         protected virtual void Worker() {
             TUnit unit;
-            //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
-            Interlocked.Increment(ref m_activeThreads);
+            bool last;
             do {
-                // exit if requested
-                if (m_exitRequired != 0) {
-                    // release the thread slot
-                    Interlocked.Decrement(ref m_activeThreads);
-                    if (ReleaseThreadSlotAnyway()) // it was the last worker
-                        m_hasTasks.Dispose();
-                    else
-                        SignalThread(); // wake next worker
-                    break;
+                while (Dequeue(out unit, m_releaseTimeout)) {
+                    InvokeUnit(unit);
                 }
+                if(!ReleaseThreadSlot(out last))
+                    continue;
+                // queue may be not empty
+                if (last && TryDequeue(out unit)) {
+                    InvokeUnit(unit);
+                    if (AllocateThreadSlot(1))
+                        continue;
+                    // we can safely exit since pool is alive
+                }
+                break;
+            } while(true);
+        }
 
-                // fetch task
-                if (TryDequeue(out unit)) {
-                    InvokeUnit(unit);
-                    continue;
-                }
-                Interlocked.Decrement(ref m_activeThreads);
-
-                // entering suspend state
-                // keep this thread and wait                
-                if (!Suspend())
-                    break;
-                //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
-                Interlocked.Increment(ref m_activeThreads);
-            } while (true);
-            //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
-        }
 
         protected virtual void Dispose(bool disposing) {
             if (disposing) {
-                if (m_exitRequired == 0) {
-                    if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
-                        return;
-
+                if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
                     // wake sleeping threads
-                    if (m_createdThreads > 0)
-                        SignalThread();
-                    else
-                        m_hasTasks.Dispose();
+                    SignalThread();
                     GC.SuppressFinalize(this);
                 }
             }