diff Implab/Parallels/DispatchPool.cs @ 80:4f20870d0816 v2

added memory barriers
author cin
date Fri, 26 Sep 2014 03:32:34 +0400
parents 2fc0fbe7d58b
children 2c5631b43c7d
line wrap: on
line diff
--- a/Implab/Parallels/DispatchPool.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -9,16 +9,17 @@
     public abstract class DispatchPool<TUnit> : IDisposable {
         readonly int m_minThreads;
         readonly int m_maxThreads;
+        readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
 
         int m_createdThreads = 0; // the current size of the pool
         int m_activeThreads = 0; // the count of threads which are active
         int m_sleepingThreads = 0; // the count of currently inactive threads
         int m_maxRunningThreads = 0; // the meximum reached size of the pool
         int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
-        int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
+
         int m_wakeEvents = 0; // the count of wake events
         
-        AutoResetEvent m_hasTasks = new AutoResetEvent(false);
+        readonly object m_signalLocker = new object();
 
         protected DispatchPool(int min, int max) {
             if (min < 0)
@@ -51,68 +52,76 @@
 
         public int PoolSize {
             get {
+                Thread.MemoryBarrier();
                 return m_createdThreads;
             }
         }
 
         public int ActiveThreads {
             get {
+                Thread.MemoryBarrier();
                 return m_activeThreads;
             }
         }
 
         public int MaxRunningThreads {
             get {
+                Thread.MemoryBarrier();
                 return m_maxRunningThreads;
             }
         }
 
         protected bool IsDisposed {
             get {
-                return m_exitRequired != 0;
+                Thread.MemoryBarrier();
+                return m_exitRequired == 1;
             }
         }
 
         protected abstract bool TryDequeue(out TUnit unit);
 
-        #region thread execution traits
+        #region thread signaling traits
         int SignalThread() {
             var signals = Interlocked.Increment(ref m_wakeEvents);
             if(signals == 1)
-                m_hasTasks.Set();
+                lock(m_signalLocker)
+                    Monitor.Pulse(m_signalLocker);
             return signals;
         }
 
         bool FetchSignalOrWait(int timeout) {
             var start = Environment.TickCount;
-
-            // означает, что поток владеет блокировкой и при успешном получении сигнала должен
-            // ее вернуть, чтобы другой ожидающий поток смог 
-            bool hasLock = false;
+            int signals;
+            Thread.MemoryBarrier(); // m_wakeEvents volatile first read
             do {
-                int signals;
-                do {
-                    signals = m_wakeEvents;
-                    if (signals == 0)
-                        break;
-                } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
+                signals = m_wakeEvents;
+                if (signals == 0)
+                    break;
+            } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
 
-                if (signals >= 1) {
-                    if (signals > 1 && hasLock)
-                        m_hasTasks.Set();
+            if (signals == 0) {
+                // no signal is fetched
+                lock(m_signalLocker) {
+                    while(m_wakeEvents == 0) {
+                        if (timeout != -1)
+                            timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+                        if(!Monitor.Wait(m_signalLocker,timeout))
+                            return false; // timeout
+                    }
+                    // m_wakeEvents > 0
+                    if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
+                        Monitor.Pulse(m_signalLocker);
+
+                    // signal fetched
                     return true;
                 }
                 
-                if (timeout != -1)
-                    timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+            } else {
+                // signal fetched
+                return true;
+            }
 
-                // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
-                // и уйдет на пустой цикл, после чего заблокируется
 
-                hasLock = true; 
-            } while (m_hasTasks.WaitOne(timeout));
-
-            return false;
         }
 
         bool Sleep(int timeout) {
@@ -131,7 +140,8 @@
         /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
         /// </summary>
         protected void GrowPool() {
-            if (m_exitRequired != 0)
+            Thread.MemoryBarrier();
+            if (m_exitRequired == 1)
                 return;
             if (m_sleepingThreads > m_wakeEvents) {
                 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
@@ -204,7 +214,7 @@
             // use spins to allocate slot for the new thread
             do {
                 current = m_createdThreads;
-                if (current >= m_maxThreads || m_exitRequired != 0)
+                if (current >= m_maxThreads || m_exitRequired == 1)
                     // no more slots left or the pool has been disposed
                     return false;
             } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
@@ -227,6 +237,7 @@
             last = false;
             int current;
             // use spins to release slot for the new thread
+            Thread.MemoryBarrier();
             do {
                 current = m_createdThreads;
                 if (current <= m_minThreads && m_exitRequired == 0)
@@ -264,6 +275,7 @@
                 // slot successfully allocated
                 var worker = new Thread(this.Worker);
                 worker.IsBackground = true;
+                Interlocked.Increment(ref m_activeThreads);
                 worker.Start();
 
                 return true;
@@ -277,15 +289,14 @@
         protected virtual void Worker() {
             TUnit unit;
             //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
-            Interlocked.Increment(ref m_activeThreads);
+            int count = 0;;
+            Thread.MemoryBarrier();
             do {
                 // exit if requested
-                if (m_exitRequired != 0) {
+                if (m_exitRequired == 1) {
                     // release the thread slot
                     Interlocked.Decrement(ref m_activeThreads);
-                    if (ReleaseThreadSlotAnyway()) // it was the last worker
-                        m_hasTasks.Dispose();
-                    else
+                    if (!ReleaseThreadSlotAnyway()) // it was the last worker
                         SignalThread(); // wake next worker
                     break;
                 }
@@ -293,14 +304,17 @@
                 // fetch task
                 if (TryDequeue(out unit)) {
                     InvokeUnit(unit);
+                    count ++;
                     continue;
                 }
                 Interlocked.Decrement(ref m_activeThreads);
 
+                Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
                 // entering suspend state
                 // keep this thread and wait                
                 if (!Suspend())
                     break;
+                count = 0;
                 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
                 Interlocked.Increment(ref m_activeThreads);
             } while (true);
@@ -309,15 +323,10 @@
 
         protected virtual void Dispose(bool disposing) {
             if (disposing) {
-                if (m_exitRequired == 0) {
-                    if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
-                        return;
-
+                if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
                     // wake sleeping threads
                     if (m_createdThreads > 0)
                         SignalThread();
-                    else
-                        m_hasTasks.Dispose();
                     GC.SuppressFinalize(this);
                 }
             }