Mercurial > pub > ImplabNet
diff Implab/Parallels/DispatchPool.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | 2fc0fbe7d58b |
children | 2c5631b43c7d |
line wrap: on
line diff
--- a/Implab/Parallels/DispatchPool.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Parallels/DispatchPool.cs Fri Sep 26 03:32:34 2014 +0400 @@ -9,16 +9,17 @@ public abstract class DispatchPool<TUnit> : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; + readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit int m_createdThreads = 0; // the current size of the pool int m_activeThreads = 0; // the count of threads which are active int m_sleepingThreads = 0; // the count of currently inactive threads int m_maxRunningThreads = 0; // the meximum reached size of the pool int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released - int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit + int m_wakeEvents = 0; // the count of wake events - AutoResetEvent m_hasTasks = new AutoResetEvent(false); + readonly object m_signalLocker = new object(); protected DispatchPool(int min, int max) { if (min < 0) @@ -51,68 +52,76 @@ public int PoolSize { get { + Thread.MemoryBarrier(); return m_createdThreads; } } public int ActiveThreads { get { + Thread.MemoryBarrier(); return m_activeThreads; } } public int MaxRunningThreads { get { + Thread.MemoryBarrier(); return m_maxRunningThreads; } } protected bool IsDisposed { get { - return m_exitRequired != 0; + Thread.MemoryBarrier(); + return m_exitRequired == 1; } } protected abstract bool TryDequeue(out TUnit unit); - #region thread execution traits + #region thread signaling traits int SignalThread() { var signals = Interlocked.Increment(ref m_wakeEvents); if(signals == 1) - m_hasTasks.Set(); + lock(m_signalLocker) + Monitor.Pulse(m_signalLocker); return signals; } bool FetchSignalOrWait(int timeout) { var start = Environment.TickCount; - - // означает, что поток владеет блокировкой и при успешном получении сигнала должен - // ее вернуть, чтобы другой ожидающий поток смог - bool hasLock = false; + int signals; + Thread.MemoryBarrier(); // m_wakeEvents volatile first read do { - int signals; - do { - signals = m_wakeEvents; - if (signals == 0) - break; - } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); + signals = m_wakeEvents; + if (signals == 0) + break; + } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); - if (signals >= 1) { - if (signals > 1 && hasLock) - m_hasTasks.Set(); + if (signals == 0) { + // no signal is fetched + lock(m_signalLocker) { + while(m_wakeEvents == 0) { + if (timeout != -1) + timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + if(!Monitor.Wait(m_signalLocker,timeout)) + return false; // timeout + } + // m_wakeEvents > 0 + if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized + Monitor.Pulse(m_signalLocker); + + // signal fetched return true; } - if (timeout != -1) - timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + } else { + // signal fetched + return true; + } - // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие - // и уйдет на пустой цикл, после чего заблокируется - hasLock = true; - } while (m_hasTasks.WaitOne(timeout)); - - return false; } bool Sleep(int timeout) { @@ -131,7 +140,8 @@ /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока /// </summary> protected void GrowPool() { - if (m_exitRequired != 0) + Thread.MemoryBarrier(); + if (m_exitRequired == 1) return; if (m_sleepingThreads > m_wakeEvents) { //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); @@ -204,7 +214,7 @@ // use spins to allocate slot for the new thread do { current = m_createdThreads; - if (current >= m_maxThreads || m_exitRequired != 0) + if (current >= m_maxThreads || m_exitRequired == 1) // no more slots left or the pool has been disposed return false; } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); @@ -227,6 +237,7 @@ last = false; int current; // use spins to release slot for the new thread + Thread.MemoryBarrier(); do { current = m_createdThreads; if (current <= m_minThreads && m_exitRequired == 0) @@ -264,6 +275,7 @@ // slot successfully allocated var worker = new Thread(this.Worker); worker.IsBackground = true; + Interlocked.Increment(ref m_activeThreads); worker.Start(); return true; @@ -277,15 +289,14 @@ protected virtual void Worker() { TUnit unit; //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); - Interlocked.Increment(ref m_activeThreads); + int count = 0;; + Thread.MemoryBarrier(); do { // exit if requested - if (m_exitRequired != 0) { + if (m_exitRequired == 1) { // release the thread slot Interlocked.Decrement(ref m_activeThreads); - if (ReleaseThreadSlotAnyway()) // it was the last worker - m_hasTasks.Dispose(); - else + if (!ReleaseThreadSlotAnyway()) // it was the last worker SignalThread(); // wake next worker break; } @@ -293,14 +304,17 @@ // fetch task if (TryDequeue(out unit)) { InvokeUnit(unit); + count ++; continue; } Interlocked.Decrement(ref m_activeThreads); + Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count); // entering suspend state // keep this thread and wait if (!Suspend()) break; + count = 0; //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); } while (true); @@ -309,15 +323,10 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - if (m_exitRequired == 0) { - if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) - return; - + if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier // wake sleeping threads if (m_createdThreads > 0) SignalThread(); - else - m_hasTasks.Dispose(); GC.SuppressFinalize(this); } }