# HG changeset patch # User cin # Date 1384336980 -14400 # Node ID 5a35900264f5d7a3e7a5adf452791da58790804d # Parent 6a56df4ec59e8a1fa1f783feb68fe3527d129070 implemented nonblocking wake singnals processing diff -r 6a56df4ec59e -r 5a35900264f5 Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs Tue Nov 12 19:52:10 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Wed Nov 13 14:03:00 2013 +0400 @@ -103,15 +103,15 @@ Assert.AreEqual(5, pool.PoolSize); - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); Assert.AreEqual(5, pool.PoolSize); for (int i = 0; i < 100; i++) - pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); - Thread.Sleep(100); + pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); + Thread.Sleep(200); Assert.AreEqual(10, pool.PoolSize); pool.Dispose(); @@ -244,7 +244,7 @@ [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(0,100,1)) { + using (var pool = new WorkerPool(4,4,0)) { int count = 10000; double[] args = new double[count]; diff -r 6a56df4ec59e -r 5a35900264f5 Implab.suo Binary file Implab.suo has changed diff -r 6a56df4ec59e -r 5a35900264f5 Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs Tue Nov 12 19:52:10 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Wed Nov 13 14:03:00 2013 +0400 @@ -83,14 +83,42 @@ return signals; } + bool FetchSignalOrWait(int timeout) { + var start = Environment.TickCount; + + // означает, что поток владеет блокировкой и при успешном получении сигнала должен + // ее вернуть, чтобы другой ожидающий поток смог + bool hasLock = false; + do { + int signals; + do { + signals = m_wakeEvents; + if (signals == 0) + break; + } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); + + if (signals >= 1) { + if (signals > 1 && hasLock) + m_hasTasks.Set(); + return true; + } + + if (timeout != -1) + timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + + // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие + // и уйдет на пустой цикл, после чего заблокируется + + hasLock = true; + } while (m_hasTasks.WaitOne(timeout)); + + return false; + } + bool Sleep(int timeout) { Interlocked.Increment(ref m_sleepingThreads); - if (m_hasTasks.WaitOne(timeout)) { - // this is autoreset event, only one thread can run this block simultaneously - var sleeping = Interlocked.Decrement(ref m_sleepingThreads); - if (Interlocked.Decrement(ref m_wakeEvents) > 0) - m_hasTasks.Set(); // wake next worker - + if (FetchSignalOrWait(timeout)) { + Interlocked.Decrement(ref m_sleepingThreads); return true; } else { Interlocked.Decrement(ref m_sleepingThreads); @@ -106,6 +134,8 @@ if (m_exitRequired != 0) return; if (m_sleepingThreads > m_wakeEvents) { + //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); + // all sleeping threads may gone SignalThread(); // wake a sleeping thread; @@ -130,8 +160,6 @@ bool last; bool requestExit; - - // if threads have a timeout before releasing if (m_releaseTimeout > 0) requestExit = !Sleep(m_releaseTimeout); @@ -242,8 +270,8 @@ void Worker() { TUnit unit; + //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); - Sleep(0); // remove wake request if the new thread is started do { // exit if requested if (m_exitRequired != 0) { @@ -269,10 +297,10 @@ // keep this thread and wait if (!Suspend()) break; - + //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); } while (true); - + //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); } protected virtual void Dispose(bool disposing) {