Mercurial > pub > ImplabNet
diff Implab/Parallels/DispatchPool.cs @ 22:5a35900264f5 promises
implemented nonblocking wake singnals processing
author | cin |
---|---|
date | Wed, 13 Nov 2013 14:03:00 +0400 |
parents | 6a56df4ec59e |
children | ee04e1fa78da |
line wrap: on
line diff
--- a/Implab/Parallels/DispatchPool.cs Tue Nov 12 19:52:10 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Wed Nov 13 14:03:00 2013 +0400 @@ -83,14 +83,42 @@ return signals; } + bool FetchSignalOrWait(int timeout) { + var start = Environment.TickCount; + + // означает, что поток владеет блокировкой и при успешном получении сигнала должен + // ее вернуть, чтобы другой ожидающий поток смог + bool hasLock = false; + do { + int signals; + do { + signals = m_wakeEvents; + if (signals == 0) + break; + } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); + + if (signals >= 1) { + if (signals > 1 && hasLock) + m_hasTasks.Set(); + return true; + } + + if (timeout != -1) + timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + + // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие + // и уйдет на пустой цикл, после чего заблокируется + + hasLock = true; + } while (m_hasTasks.WaitOne(timeout)); + + return false; + } + bool Sleep(int timeout) { Interlocked.Increment(ref m_sleepingThreads); - if (m_hasTasks.WaitOne(timeout)) { - // this is autoreset event, only one thread can run this block simultaneously - var sleeping = Interlocked.Decrement(ref m_sleepingThreads); - if (Interlocked.Decrement(ref m_wakeEvents) > 0) - m_hasTasks.Set(); // wake next worker - + if (FetchSignalOrWait(timeout)) { + Interlocked.Decrement(ref m_sleepingThreads); return true; } else { Interlocked.Decrement(ref m_sleepingThreads); @@ -106,6 +134,8 @@ if (m_exitRequired != 0) return; if (m_sleepingThreads > m_wakeEvents) { + //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); + // all sleeping threads may gone SignalThread(); // wake a sleeping thread; @@ -130,8 +160,6 @@ bool last; bool requestExit; - - // if threads have a timeout before releasing if (m_releaseTimeout > 0) requestExit = !Sleep(m_releaseTimeout); @@ -242,8 +270,8 @@ void Worker() { TUnit unit; + //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); - Sleep(0); // remove wake request if the new thread is started do { // exit if requested if (m_exitRequired != 0) { @@ -269,10 +297,10 @@ // keep this thread and wait if (!Suspend()) break; - + //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); } while (true); - + //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); } protected virtual void Dispose(bool disposing) {