# HG changeset patch # User cin # Date 1384208842 -14400 # Node ID 1c3b3d518480cc457fb114ffac1ff53565d383a4 # Parent e3935fdf59a2a7ec7e38de609b39531519cf4ea3 refactoring, sync diff -r e3935fdf59a2 -r 1c3b3d518480 Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs Sun Nov 10 00:21:33 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Tue Nov 12 02:27:22 2013 +0400 @@ -101,18 +101,18 @@ public void WorkerPoolSizeTest() { var pool = new WorkerPool(5, 10, 0); - Assert.AreEqual(5, pool.ThreadCount); + Assert.AreEqual(5, pool.PoolSize); pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); - Assert.AreEqual(5, pool.ThreadCount); + Assert.AreEqual(5, pool.PoolSize); for (int i = 0; i < 100; i++) pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); Thread.Sleep(100); - Assert.AreEqual(10, pool.ThreadCount); + Assert.AreEqual(10, pool.PoolSize); pool.Dispose(); } @@ -149,10 +149,10 @@ [TestMethod] public void WorkerPoolDisposeTest() { var pool = new WorkerPool(5, 20); - Assert.AreEqual(5, pool.ThreadCount); + Assert.AreEqual(5, pool.PoolSize); pool.Dispose(); - Thread.Sleep(100); - Assert.AreEqual(0, pool.ThreadCount); + Thread.Sleep(200); + Assert.AreEqual(0, pool.PoolSize); pool.Dispose(); } @@ -244,7 +244,7 @@ [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(4,4,0)) { + using (var pool = new WorkerPool(0,100,0)) { int count = 10000; double[] args = new double[count]; diff -r e3935fdf59a2 -r 1c3b3d518480 Implab.v11.suo Binary file Implab.v11.suo has changed diff -r e3935fdf59a2 -r 1c3b3d518480 Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs Sun Nov 10 00:21:33 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Tue Nov 12 02:27:22 2013 +0400 @@ -9,10 +9,12 @@ public abstract class DispatchPool : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; - int m_runningThreads = 0; + int m_createdThreads = 0; + int m_activeThreads = 0; + int m_sleepingThreads = 0; int m_maxRunningThreads = 0; - int m_suspended = 0; int m_exitRequired = 0; + int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit AutoResetEvent m_hasTasks = new AutoResetEvent(false); protected DispatchPool(int min, int max) { @@ -44,9 +46,15 @@ StartWorker(); } - public int ThreadCount { + public int PoolSize { get { - return m_runningThreads; + return m_createdThreads; + } + } + + public int ActiveThreads { + get { + return m_activeThreads; } } @@ -65,11 +73,18 @@ protected abstract bool TryDequeue(out TUnit unit); protected virtual bool ExtendPool() { - if (m_suspended > 0) { - m_hasTasks.Set(); + if (m_sleepingThreads == 0) + // no sleeping workers are available + // try create one + return StartWorker(); + else { + // we can get here a race condition when several threads asks to extend pool + // and some sleaping threads are exited due timeout but they are still counted as sleeping + // in that case all of this threads could exit except one + WakePool(); return true; - } else - return StartWorker(); + } + } /// @@ -79,14 +94,50 @@ m_hasTasks.Set(); // wake sleeping thread; if (AllocateThreadSlot(1)) { + // if there were no threads in the pool var worker = new Thread(this.Worker); worker.IsBackground = true; worker.Start(); } } - protected virtual void Suspend() { - m_hasTasks.WaitOne(); + bool Sleep(int timeout) { + Interlocked.Increment(ref m_sleepingThreads); + var result = m_hasTasks.WaitOne(timeout); + Interlocked.Decrement(ref m_sleepingThreads); + return result; + } + + protected virtual bool Suspend() { + //no tasks left, exit if the thread is no longer needed + bool last; + bool requestExit; + + if (m_releaseTimeout > 0) + requestExit = !Sleep(m_releaseTimeout); + else + requestExit = true; + + + if (requestExit && ReleaseThreadSlot(out last)) { + // in case at the moment the last thread was being released + // a new task was added to the queue, we need to try + // to revoke the thread to avoid the situation when the task is left unprocessed + if (last && m_hasTasks.WaitOne(0)) { + if (AllocateThreadSlot(1)) + return true; // spin again... + else + // we failed to reallocate the first slot for this thread + // therefore we need to release the event + m_hasTasks.Set(); + } + + return false; + } + + Sleep(-1); + + return true; } #region thread slots traits @@ -95,11 +146,11 @@ int current; // use spins to allocate slot for the new thread do { - current = m_runningThreads; + current = m_createdThreads; if (current >= m_maxThreads || m_exitRequired != 0) // no more slots left or the pool has been disposed return false; - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); UpdateMaxThreads(current + 1); @@ -107,7 +158,7 @@ } bool AllocateThreadSlot(int desired) { - if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) + if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) return false; UpdateMaxThreads(desired); @@ -120,11 +171,11 @@ int current; // use spins to release slot for the new thread do { - current = m_runningThreads; + current = m_createdThreads; if (current <= m_minThreads && m_exitRequired == 0) // the thread is reserved return false; - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); last = (current == 1); @@ -136,7 +187,7 @@ /// /// true - no more threads left bool ReleaseThreadSlotAnyway() { - var left = Interlocked.Decrement(ref m_runningThreads); + var left = Interlocked.Decrement(ref m_createdThreads); return left == 0; } @@ -169,6 +220,7 @@ // exit if requested if (m_exitRequired != 0) { // release the thread slot + Interlocked.Decrement(ref m_activeThreads); if (ReleaseThreadSlotAnyway()) // it was the last worker m_hasTasks.Dispose(); else @@ -183,26 +235,14 @@ return true; } - //no tasks left, exit if the thread is no longer needed - bool last; - if (ReleaseThreadSlot(out last)) { - if (last && m_hasTasks.WaitOne(0)) { - if (AllocateThreadSlot(1)) - continue; // spin again... - else - // we failed to reallocate slot for this thread - // therefore we need to release the event - m_hasTasks.Set(); - } - - return false; - } + Interlocked.Decrement(ref m_activeThreads); // entering suspend state - Interlocked.Increment(ref m_suspended); // keep this thread and wait - Suspend(); - Interlocked.Decrement(ref m_suspended); + if (!Suspend()) + return false; + + Interlocked.Increment(ref m_activeThreads); } while (true); } @@ -210,6 +250,7 @@ void Worker() { TUnit unit; + Interlocked.Increment(ref m_activeThreads); while (FetchTask(out unit)) InvokeUnit(unit); } diff -r e3935fdf59a2 -r 1c3b3d518480 Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs Sun Nov 10 00:21:33 2013 +0400 +++ b/Implab/Parallels/WorkerPool.cs Tue Nov 12 02:27:22 2013 +0400 @@ -57,12 +57,11 @@ var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - if(!ExtendPool()) - WakePool(); + ExtendPool(); } protected override bool ExtendPool() { - if (m_queueLength <= m_threshold*ThreadCount) + if (m_queueLength <= m_threshold*ActiveThreads) // in this case we are in active thread and it request for additional workers // satisfy it only when queue is longer than threshold return false; @@ -81,9 +80,11 @@ unit(); } - protected override void Suspend() { + protected override bool Suspend() { if (m_queueLength == 0) - base.Suspend(); + return base.Suspend(); + else + return true; // spin again without locks... } } }