Mercurial > pub > ImplabNet
diff Implab/Parallels/DispatchPool.cs @ 20:1c3b3d518480 promises
refactoring, sync
author | cin |
---|---|
date | Tue, 12 Nov 2013 02:27:22 +0400 |
parents | 7cd4a843b4e4 |
children | 6a56df4ec59e |
line wrap: on
line diff
--- a/Implab/Parallels/DispatchPool.cs Sun Nov 10 00:21:33 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Tue Nov 12 02:27:22 2013 +0400 @@ -9,10 +9,12 @@ public abstract class DispatchPool<TUnit> : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; - int m_runningThreads = 0; + int m_createdThreads = 0; + int m_activeThreads = 0; + int m_sleepingThreads = 0; int m_maxRunningThreads = 0; - int m_suspended = 0; int m_exitRequired = 0; + int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit AutoResetEvent m_hasTasks = new AutoResetEvent(false); protected DispatchPool(int min, int max) { @@ -44,9 +46,15 @@ StartWorker(); } - public int ThreadCount { + public int PoolSize { get { - return m_runningThreads; + return m_createdThreads; + } + } + + public int ActiveThreads { + get { + return m_activeThreads; } } @@ -65,11 +73,18 @@ protected abstract bool TryDequeue(out TUnit unit); protected virtual bool ExtendPool() { - if (m_suspended > 0) { - m_hasTasks.Set(); + if (m_sleepingThreads == 0) + // no sleeping workers are available + // try create one + return StartWorker(); + else { + // we can get here a race condition when several threads asks to extend pool + // and some sleaping threads are exited due timeout but they are still counted as sleeping + // in that case all of this threads could exit except one + WakePool(); return true; - } else - return StartWorker(); + } + } /// <summary> @@ -79,14 +94,50 @@ m_hasTasks.Set(); // wake sleeping thread; if (AllocateThreadSlot(1)) { + // if there were no threads in the pool var worker = new Thread(this.Worker); worker.IsBackground = true; worker.Start(); } } - protected virtual void Suspend() { - m_hasTasks.WaitOne(); + bool Sleep(int timeout) { + Interlocked.Increment(ref m_sleepingThreads); + var result = m_hasTasks.WaitOne(timeout); + Interlocked.Decrement(ref m_sleepingThreads); + return result; + } + + protected virtual bool Suspend() { + //no tasks left, exit if the thread is no longer needed + bool last; + bool requestExit; + + if (m_releaseTimeout > 0) + requestExit = !Sleep(m_releaseTimeout); + else + requestExit = true; + + + if (requestExit && ReleaseThreadSlot(out last)) { + // in case at the moment the last thread was being released + // a new task was added to the queue, we need to try + // to revoke the thread to avoid the situation when the task is left unprocessed + if (last && m_hasTasks.WaitOne(0)) { + if (AllocateThreadSlot(1)) + return true; // spin again... + else + // we failed to reallocate the first slot for this thread + // therefore we need to release the event + m_hasTasks.Set(); + } + + return false; + } + + Sleep(-1); + + return true; } #region thread slots traits @@ -95,11 +146,11 @@ int current; // use spins to allocate slot for the new thread do { - current = m_runningThreads; + current = m_createdThreads; if (current >= m_maxThreads || m_exitRequired != 0) // no more slots left or the pool has been disposed return false; - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); UpdateMaxThreads(current + 1); @@ -107,7 +158,7 @@ } bool AllocateThreadSlot(int desired) { - if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) + if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) return false; UpdateMaxThreads(desired); @@ -120,11 +171,11 @@ int current; // use spins to release slot for the new thread do { - current = m_runningThreads; + current = m_createdThreads; if (current <= m_minThreads && m_exitRequired == 0) // the thread is reserved return false; - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); last = (current == 1); @@ -136,7 +187,7 @@ /// </summary> /// <returns>true - no more threads left</returns> bool ReleaseThreadSlotAnyway() { - var left = Interlocked.Decrement(ref m_runningThreads); + var left = Interlocked.Decrement(ref m_createdThreads); return left == 0; } @@ -169,6 +220,7 @@ // exit if requested if (m_exitRequired != 0) { // release the thread slot + Interlocked.Decrement(ref m_activeThreads); if (ReleaseThreadSlotAnyway()) // it was the last worker m_hasTasks.Dispose(); else @@ -183,26 +235,14 @@ return true; } - //no tasks left, exit if the thread is no longer needed - bool last; - if (ReleaseThreadSlot(out last)) { - if (last && m_hasTasks.WaitOne(0)) { - if (AllocateThreadSlot(1)) - continue; // spin again... - else - // we failed to reallocate slot for this thread - // therefore we need to release the event - m_hasTasks.Set(); - } - - return false; - } + Interlocked.Decrement(ref m_activeThreads); // entering suspend state - Interlocked.Increment(ref m_suspended); // keep this thread and wait - Suspend(); - Interlocked.Decrement(ref m_suspended); + if (!Suspend()) + return false; + + Interlocked.Increment(ref m_activeThreads); } while (true); } @@ -210,6 +250,7 @@ void Worker() { TUnit unit; + Interlocked.Increment(ref m_activeThreads); while (FetchTask(out unit)) InvokeUnit(unit); }