changeset 22:5a35900264f5 promises

implemented nonblocking wake singnals processing
author cin
date Wed, 13 Nov 2013 14:03:00 +0400
parents 6a56df4ec59e
children f0568ff069a5
files Implab.Test/AsyncTests.cs Implab.suo Implab/Parallels/DispatchPool.cs
diffstat 3 files changed, 45 insertions(+), 17 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Tue Nov 12 19:52:10 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Wed Nov 13 14:03:00 2013 +0400
@@ -103,15 +103,15 @@
 
             Assert.AreEqual(5, pool.PoolSize);
 
-            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
 
             Assert.AreEqual(5, pool.PoolSize);
 
             for (int i = 0; i < 100; i++)
-                pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
-            Thread.Sleep(100);
+                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            Thread.Sleep(200);
             Assert.AreEqual(10, pool.PoolSize);
 
             pool.Dispose();
@@ -244,7 +244,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(0,100,1)) {
+            using (var pool = new WorkerPool(4,4,0)) {
                 int count = 10000;
 
                 double[] args = new double[count];
Binary file Implab.suo has changed
--- a/Implab/Parallels/DispatchPool.cs	Tue Nov 12 19:52:10 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Wed Nov 13 14:03:00 2013 +0400
@@ -83,14 +83,42 @@
             return signals;
         }
 
+        bool FetchSignalOrWait(int timeout) {
+            var start = Environment.TickCount;
+
+            // означает, что поток владеет блокировкой и при успешном получении сигнала должен
+            // ее вернуть, чтобы другой ожидающий поток смог 
+            bool hasLock = false;
+            do {
+                int signals;
+                do {
+                    signals = m_wakeEvents;
+                    if (signals == 0)
+                        break;
+                } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
+
+                if (signals >= 1) {
+                    if (signals > 1 && hasLock)
+                        m_hasTasks.Set();
+                    return true;
+                }
+                
+                if (timeout != -1)
+                    timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+
+                // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
+                // и уйдет на пустой цикл, после чего заблокируется
+
+                hasLock = true; 
+            } while (m_hasTasks.WaitOne(timeout));
+
+            return false;
+        }
+
         bool Sleep(int timeout) {
             Interlocked.Increment(ref m_sleepingThreads);
-            if (m_hasTasks.WaitOne(timeout)) {
-                // this is autoreset event, only one thread can run this block simultaneously
-                var sleeping = Interlocked.Decrement(ref m_sleepingThreads);
-                if (Interlocked.Decrement(ref m_wakeEvents) > 0)
-                    m_hasTasks.Set(); // wake next worker
-
+            if (FetchSignalOrWait(timeout)) {
+                Interlocked.Decrement(ref m_sleepingThreads);
                 return true;
             } else {
                 Interlocked.Decrement(ref m_sleepingThreads);
@@ -106,6 +134,8 @@
             if (m_exitRequired != 0)
                 return;
             if (m_sleepingThreads > m_wakeEvents) {
+                //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
+
                 // all sleeping threads may gone
                 SignalThread(); // wake a sleeping thread;
 
@@ -130,8 +160,6 @@
             bool last;
             bool requestExit;
 
-            
-
             // if threads have a timeout before releasing
             if (m_releaseTimeout > 0)
                 requestExit = !Sleep(m_releaseTimeout);
@@ -242,8 +270,8 @@
 
         void Worker() {
             TUnit unit;
+            //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
             Interlocked.Increment(ref m_activeThreads);
-            Sleep(0); // remove wake request if the new thread is started
             do {
                 // exit if requested
                 if (m_exitRequired != 0) {
@@ -269,10 +297,10 @@
                 // keep this thread and wait                
                 if (!Suspend())
                     break;
-
+                //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
                 Interlocked.Increment(ref m_activeThreads);
             } while (true);
-               
+            //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
         }
 
         protected virtual void Dispose(bool disposing) {