# HG changeset patch
# User cin
# Date 1383859542 -14400
# Node ID 7cd4a843b4e4084855ab73d2a036bffaa0705f4f
# Parent 5a4b735ba66981d684eb477f0f5e697d2200bf82
Improved worker pool
diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab.Test/AsyncTests.cs
--- a/Implab.Test/AsyncTests.cs Thu Nov 07 20:20:26 2013 +0400
+++ b/Implab.Test/AsyncTests.cs Fri Nov 08 01:25:42 2013 +0400
@@ -99,7 +99,7 @@
[TestMethod]
public void WorkerPoolSizeTest() {
- var pool = new WorkerPool(5, 10);
+ var pool = new WorkerPool(5, 10, 0);
Assert.AreEqual(5, pool.ThreadCount);
@@ -119,7 +119,7 @@
[TestMethod]
public void WorkerPoolCorrectTest() {
- var pool = new WorkerPool();
+ var pool = new WorkerPool(0,1000,100);
int iterations = 1000;
int pending = iterations;
@@ -244,7 +244,7 @@
[TestMethod]
public void ChainedMapTest() {
- using (var pool = new WorkerPool(1,10)) {
+ using (var pool = new WorkerPool(8,100,0)) {
int count = 10000;
double[] args = new double[count];
diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab.v11.suo
Binary file Implab.v11.suo has changed
diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab/Parallels/DispatchPool.cs
--- a/Implab/Parallels/DispatchPool.cs Thu Nov 07 20:20:26 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs Fri Nov 08 01:25:42 2013 +0400
@@ -62,7 +62,36 @@
}
}
- bool StartWorker() {
+ protected abstract bool TryDequeue(out TUnit unit);
+
+ protected virtual bool ExtendPool() {
+ if (m_suspended > 0) {
+ m_hasTasks.Set();
+ return true;
+ } else
+ return StartWorker();
+ }
+
+ ///
+ /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
+ ///
+ protected void WakePool() {
+ m_hasTasks.Set(); // wake sleeping thread;
+
+ if (AllocateThreadSlot(1)) {
+ var worker = new Thread(this.Worker);
+ worker.IsBackground = true;
+ worker.Start();
+ }
+ }
+
+ protected virtual void Suspend() {
+ m_hasTasks.WaitOne();
+ }
+
+ #region thread slots traits
+
+ bool AllocateThreadSlot() {
int current;
// use spins to allocate slot for the new thread
do {
@@ -72,38 +101,67 @@
return false;
} while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
- m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1);
+ UpdateMaxThreads(current + 1);
+
+ return true;
+ }
- // slot successfully allocated
+ bool AllocateThreadSlot(int desired) {
+ if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
+ return false;
+
+ UpdateMaxThreads(desired);
- var worker = new Thread(this.Worker);
- worker.IsBackground = true;
- worker.Start();
+ return true;
+ }
+
+ bool ReleaseThreadSlot(out bool last) {
+ last = false;
+ int current;
+ // use spins to release slot for the new thread
+ do {
+ current = m_runningThreads;
+ if (current <= m_minThreads && m_exitRequired == 0)
+ // the thread is reserved
+ return false;
+ } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
+
+ last = (current == 1);
return true;
}
- protected abstract bool TryDequeue(out TUnit unit);
-
- protected virtual void WakeNewWorker(bool extend) {
- if (m_suspended > 0)
- m_hasTasks.Set();
- else
- StartWorker();
+ ///
+ /// releases thread slot unconditionally, used during cleanup
+ ///
+ /// true - no more threads left
+ bool ReleaseThreadSlotAnyway() {
+ var left = Interlocked.Decrement(ref m_runningThreads);
+ return left == 0;
}
- ///
- /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
- ///
- protected void StartIfIdle() {
- int threads;
+ void UpdateMaxThreads(int count) {
+ int max;
do {
-
- }
+ max = m_maxRunningThreads;
+ if (max >= count)
+ break;
+ } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
}
- protected virtual void Suspend() {
- m_hasTasks.WaitOne();
+ #endregion
+
+ bool StartWorker() {
+ if (AllocateThreadSlot()) {
+ // slot successfully allocated
+ var worker = new Thread(this.Worker);
+ worker.IsBackground = true;
+ worker.Start();
+
+ return true;
+ } else {
+ return false;
+ }
}
bool FetchTask(out TUnit unit) {
@@ -111,35 +169,32 @@
// exit if requested
if (m_exitRequired != 0) {
// release the thread slot
- var running = Interlocked.Decrement(ref m_runningThreads);
- if (running == 0) // it was the last worker
+ if (ReleaseThreadSlotAnyway()) // it was the last worker
m_hasTasks.Dispose();
else
- m_hasTasks.Set(); // release next worker
+ m_hasTasks.Set(); // wake next worker
unit = default(TUnit);
return false;
}
// fetch task
if (TryDequeue(out unit)) {
- WakeNewWorker(true);
+ ExtendPool();
return true;
}
//no tasks left, exit if the thread is no longer needed
- int runningThreads;
- bool exit = true;
- do {
- runningThreads = m_runningThreads;
- if (runningThreads <= m_minThreads) {
- // check wheather this is the last thread and we have tasks
+ bool last;
+ if (ReleaseThreadSlot(out last)) {
+ if (last && m_hasTasks.WaitOne(0)) {
+ if (AllocateThreadSlot(1))
+ continue; // spin again...
+ else
+ // we failed to reallocate slot for this thread
+ // therefore we need to release the event
+ m_hasTasks.Set();
+ }
- exit = false;
- break;
- }
- } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads));
-
- if (exit) {
return false;
}
diff -r 5a4b735ba669 -r 7cd4a843b4e4 Implab/Parallels/WorkerPool.cs
--- a/Implab/Parallels/WorkerPool.cs Thu Nov 07 20:20:26 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs Fri Nov 08 01:25:42 2013 +0400
@@ -57,17 +57,16 @@
var len = Interlocked.Increment(ref m_queueLength);
m_queue.Enqueue(unit);
- if (ThreadCount == 0)
- // force to start
- WakeNewWorker(false);
+ if(!ExtendPool())
+ WakePool();
}
- protected override void WakeNewWorker(bool extend) {
- if (extend && m_queueLength <= m_threshold)
+ protected override bool ExtendPool() {
+ if (m_queueLength <= m_threshold*ThreadCount)
// in this case we are in active thread and it request for additional workers
// satisfy it only when queue is longer than threshold
- return;
- base.WakeNewWorker(extend);
+ return false;
+ return base.ExtendPool();
}
protected override bool TryDequeue(out Action unit) {