# HG changeset patch # User cin # Date 1383859542 -14400 # Node ID 7cd4a843b4e4084855ab73d2a036bffaa0705f4f # Parent 5a4b735ba66981d684eb477f0f5e697d2200bf82 Improved worker pool diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs Thu Nov 07 20:20:26 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Fri Nov 08 01:25:42 2013 +0400 @@ -99,7 +99,7 @@ [TestMethod] public void WorkerPoolSizeTest() { - var pool = new WorkerPool(5, 10); + var pool = new WorkerPool(5, 10, 0); Assert.AreEqual(5, pool.ThreadCount); @@ -119,7 +119,7 @@ [TestMethod] public void WorkerPoolCorrectTest() { - var pool = new WorkerPool(); + var pool = new WorkerPool(0,1000,100); int iterations = 1000; int pending = iterations; @@ -244,7 +244,7 @@ [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(1,10)) { + using (var pool = new WorkerPool(8,100,0)) { int count = 10000; double[] args = new double[count]; diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab.v11.suo Binary file Implab.v11.suo has changed diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs Thu Nov 07 20:20:26 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Fri Nov 08 01:25:42 2013 +0400 @@ -62,7 +62,36 @@ } } - bool StartWorker() { + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual bool ExtendPool() { + if (m_suspended > 0) { + m_hasTasks.Set(); + return true; + } else + return StartWorker(); + } + + /// + /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока + /// + protected void WakePool() { + m_hasTasks.Set(); // wake sleeping thread; + + if (AllocateThreadSlot(1)) { + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + } + } + + protected virtual void Suspend() { + m_hasTasks.WaitOne(); + } + + #region thread slots traits + + bool AllocateThreadSlot() { int current; // use spins to allocate slot for the new thread do { @@ -72,38 +101,67 @@ return false; } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); - m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); + UpdateMaxThreads(current + 1); + + return true; + } - // slot successfully allocated + bool AllocateThreadSlot(int desired) { + if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) + return false; + + UpdateMaxThreads(desired); - var worker = new Thread(this.Worker); - worker.IsBackground = true; - worker.Start(); + return true; + } + + bool ReleaseThreadSlot(out bool last) { + last = false; + int current; + // use spins to release slot for the new thread + do { + current = m_runningThreads; + if (current <= m_minThreads && m_exitRequired == 0) + // the thread is reserved + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + + last = (current == 1); return true; } - protected abstract bool TryDequeue(out TUnit unit); - - protected virtual void WakeNewWorker(bool extend) { - if (m_suspended > 0) - m_hasTasks.Set(); - else - StartWorker(); + /// + /// releases thread slot unconditionally, used during cleanup + /// + /// true - no more threads left + bool ReleaseThreadSlotAnyway() { + var left = Interlocked.Decrement(ref m_runningThreads); + return left == 0; } - /// - /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока - /// - protected void StartIfIdle() { - int threads; + void UpdateMaxThreads(int count) { + int max; do { - - } + max = m_maxRunningThreads; + if (max >= count) + break; + } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); } - protected virtual void Suspend() { - m_hasTasks.WaitOne(); + #endregion + + bool StartWorker() { + if (AllocateThreadSlot()) { + // slot successfully allocated + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } else { + return false; + } } bool FetchTask(out TUnit unit) { @@ -111,35 +169,32 @@ // exit if requested if (m_exitRequired != 0) { // release the thread slot - var running = Interlocked.Decrement(ref m_runningThreads); - if (running == 0) // it was the last worker + if (ReleaseThreadSlotAnyway()) // it was the last worker m_hasTasks.Dispose(); else - m_hasTasks.Set(); // release next worker + m_hasTasks.Set(); // wake next worker unit = default(TUnit); return false; } // fetch task if (TryDequeue(out unit)) { - WakeNewWorker(true); + ExtendPool(); return true; } //no tasks left, exit if the thread is no longer needed - int runningThreads; - bool exit = true; - do { - runningThreads = m_runningThreads; - if (runningThreads <= m_minThreads) { - // check wheather this is the last thread and we have tasks + bool last; + if (ReleaseThreadSlot(out last)) { + if (last && m_hasTasks.WaitOne(0)) { + if (AllocateThreadSlot(1)) + continue; // spin again... + else + // we failed to reallocate slot for this thread + // therefore we need to release the event + m_hasTasks.Set(); + } - exit = false; - break; - } - } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); - - if (exit) { return false; } diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs Thu Nov 07 20:20:26 2013 +0400 +++ b/Implab/Parallels/WorkerPool.cs Fri Nov 08 01:25:42 2013 +0400 @@ -57,17 +57,16 @@ var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - if (ThreadCount == 0) - // force to start - WakeNewWorker(false); + if(!ExtendPool()) + WakePool(); } - protected override void WakeNewWorker(bool extend) { - if (extend && m_queueLength <= m_threshold) + protected override bool ExtendPool() { + if (m_queueLength <= m_threshold*ThreadCount) // in this case we are in active thread and it request for additional workers // satisfy it only when queue is longer than threshold - return; - base.WakeNewWorker(extend); + return false; + return base.ExtendPool(); } protected override bool TryDequeue(out Action unit) {