Mercurial > pub > ImplabNet
diff Implab/Parallels/DispatchPool.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | |
children | 5a4b735ba669 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/DispatchPool.cs Thu Nov 07 03:41:32 2013 +0400 @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public abstract class DispatchPool<TUnit> : IDisposable { + readonly int m_minThreads; + readonly int m_maxThreads; + int m_runningThreads = 0; + int m_maxRunningThreads = 0; + int m_suspended = 0; + int m_exitRequired = 0; + AutoResetEvent m_hasTasks = new AutoResetEvent(false); + + protected DispatchPool(int min, int max) { + if (min < 0) + throw new ArgumentOutOfRangeException("min"); + if (max <= 0) + throw new ArgumentOutOfRangeException("max"); + + if (min > max) + min = max; + m_minThreads = min; + m_maxThreads = max; + } + + protected DispatchPool(int threads) + : this(threads, threads) { + } + + protected DispatchPool() { + int maxThreads, maxCP; + ThreadPool.GetMaxThreads(out maxThreads, out maxCP); + + m_minThreads = 0; + m_maxThreads = maxThreads; + } + + protected void InitPool() { + for (int i = 0; i < m_minThreads; i++) + StartWorker(); + } + + public int ThreadCount { + get { + return m_runningThreads; + } + } + + public int MaxRunningThreads { + get { + return m_maxRunningThreads; + } + } + + protected bool IsDisposed { + get { + return m_exitRequired != 0; + } + } + + bool StartWorker() { + var current = m_runningThreads; + // use spins to allocate slot for the new thread + do { + if (current >= m_maxThreads || m_exitRequired != 0) + // no more slots left or the pool has been disposed + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + + m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); + + // slot successfully allocated + + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } + + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual void WakeNewWorker() { + if (m_suspended > 0) + m_hasTasks.Set(); + else + StartWorker(); + } + + bool FetchTask(out TUnit unit) { + do { + // exit if requested + if (m_exitRequired != 0) { + // release the thread slot + int running; + do { + running = m_runningThreads; + } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); + running--; + + if (running == 0) // it was the last worker + m_hasTasks.Dispose(); + else + m_hasTasks.Set(); // release next worker + unit = default(TUnit); + return false; + } + + // fetch task + if (TryDequeue(out unit)) { + WakeNewWorker(); + return true; + } + + //no tasks left, exit if the thread is no longer needed + int runningThreads; + bool exit = true; + do { + runningThreads = m_runningThreads; + if (runningThreads <= m_minThreads) { + exit = false; + break; + } + } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); + + if (exit) { + Interlocked.Decrement(ref m_runningThreads); + return false; + } + + // keep this thread and wait + Interlocked.Increment(ref m_suspended); + m_hasTasks.WaitOne(); + Interlocked.Decrement(ref m_suspended); + } while (true); + } + + protected abstract void InvokeUnit(TUnit unit); + + void Worker() { + TUnit unit; + while (FetchTask(out unit)) + InvokeUnit(unit); + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + if (m_exitRequired == 0) { + if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) + return; + + // wake sleeping threads + m_hasTasks.Set(); + GC.SuppressFinalize(this); + } + } + } + + public void Dispose() { + Dispose(true); + } + + ~DispatchPool() { + Dispose(false); + } + } +}