Mercurial > pub > ImplabNet
diff Implab/Parallels/DispatchPool.cs @ 17:7cd4a843b4e4 promises
Improved worker pool
author | cin |
---|---|
date | Fri, 08 Nov 2013 01:25:42 +0400 |
parents | 5a4b735ba669 |
children | 1c3b3d518480 |
line wrap: on
line diff
--- a/Implab/Parallels/DispatchPool.cs Thu Nov 07 20:20:26 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Fri Nov 08 01:25:42 2013 +0400 @@ -62,7 +62,36 @@ } } - bool StartWorker() { + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual bool ExtendPool() { + if (m_suspended > 0) { + m_hasTasks.Set(); + return true; + } else + return StartWorker(); + } + + /// <summary> + /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока + /// </summary> + protected void WakePool() { + m_hasTasks.Set(); // wake sleeping thread; + + if (AllocateThreadSlot(1)) { + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + } + } + + protected virtual void Suspend() { + m_hasTasks.WaitOne(); + } + + #region thread slots traits + + bool AllocateThreadSlot() { int current; // use spins to allocate slot for the new thread do { @@ -72,38 +101,67 @@ return false; } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); - m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); + UpdateMaxThreads(current + 1); + + return true; + } - // slot successfully allocated + bool AllocateThreadSlot(int desired) { + if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) + return false; + + UpdateMaxThreads(desired); - var worker = new Thread(this.Worker); - worker.IsBackground = true; - worker.Start(); + return true; + } + + bool ReleaseThreadSlot(out bool last) { + last = false; + int current; + // use spins to release slot for the new thread + do { + current = m_runningThreads; + if (current <= m_minThreads && m_exitRequired == 0) + // the thread is reserved + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + + last = (current == 1); return true; } - protected abstract bool TryDequeue(out TUnit unit); - - protected virtual void WakeNewWorker(bool extend) { - if (m_suspended > 0) - m_hasTasks.Set(); - else - StartWorker(); + /// <summary> + /// releases thread slot unconditionally, used during cleanup + /// </summary> + /// <returns>true - no more threads left</returns> + bool ReleaseThreadSlotAnyway() { + var left = Interlocked.Decrement(ref m_runningThreads); + return left == 0; } - /// <summary> - /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока - /// </summary> - protected void StartIfIdle() { - int threads; + void UpdateMaxThreads(int count) { + int max; do { - - } + max = m_maxRunningThreads; + if (max >= count) + break; + } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); } - protected virtual void Suspend() { - m_hasTasks.WaitOne(); + #endregion + + bool StartWorker() { + if (AllocateThreadSlot()) { + // slot successfully allocated + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } else { + return false; + } } bool FetchTask(out TUnit unit) { @@ -111,35 +169,32 @@ // exit if requested if (m_exitRequired != 0) { // release the thread slot - var running = Interlocked.Decrement(ref m_runningThreads); - if (running == 0) // it was the last worker + if (ReleaseThreadSlotAnyway()) // it was the last worker m_hasTasks.Dispose(); else - m_hasTasks.Set(); // release next worker + m_hasTasks.Set(); // wake next worker unit = default(TUnit); return false; } // fetch task if (TryDequeue(out unit)) { - WakeNewWorker(true); + ExtendPool(); return true; } //no tasks left, exit if the thread is no longer needed - int runningThreads; - bool exit = true; - do { - runningThreads = m_runningThreads; - if (runningThreads <= m_minThreads) { - // check wheather this is the last thread and we have tasks + bool last; + if (ReleaseThreadSlot(out last)) { + if (last && m_hasTasks.WaitOne(0)) { + if (AllocateThreadSlot(1)) + continue; // spin again... + else + // we failed to reallocate slot for this thread + // therefore we need to release the event + m_hasTasks.Set(); + } - exit = false; - break; - } - } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); - - if (exit) { return false; }