Mercurial > pub > ImplabNet
view Implab/Parallels/DispatchPool.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | |
children | 5a4b735ba669 |
line wrap: on
line source
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Diagnostics; namespace Implab.Parallels { public abstract class DispatchPool<TUnit> : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; int m_runningThreads = 0; int m_maxRunningThreads = 0; int m_suspended = 0; int m_exitRequired = 0; AutoResetEvent m_hasTasks = new AutoResetEvent(false); protected DispatchPool(int min, int max) { if (min < 0) throw new ArgumentOutOfRangeException("min"); if (max <= 0) throw new ArgumentOutOfRangeException("max"); if (min > max) min = max; m_minThreads = min; m_maxThreads = max; } protected DispatchPool(int threads) : this(threads, threads) { } protected DispatchPool() { int maxThreads, maxCP; ThreadPool.GetMaxThreads(out maxThreads, out maxCP); m_minThreads = 0; m_maxThreads = maxThreads; } protected void InitPool() { for (int i = 0; i < m_minThreads; i++) StartWorker(); } public int ThreadCount { get { return m_runningThreads; } } public int MaxRunningThreads { get { return m_maxRunningThreads; } } protected bool IsDisposed { get { return m_exitRequired != 0; } } bool StartWorker() { var current = m_runningThreads; // use spins to allocate slot for the new thread do { if (current >= m_maxThreads || m_exitRequired != 0) // no more slots left or the pool has been disposed return false; } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); // slot successfully allocated var worker = new Thread(this.Worker); worker.IsBackground = true; worker.Start(); return true; } protected abstract bool TryDequeue(out TUnit unit); protected virtual void WakeNewWorker() { if (m_suspended > 0) m_hasTasks.Set(); else StartWorker(); } bool FetchTask(out TUnit unit) { do { // exit if requested if (m_exitRequired != 0) { // release the thread slot int running; do { running = m_runningThreads; } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); running--; if (running == 0) // it was the last worker m_hasTasks.Dispose(); else m_hasTasks.Set(); // release next worker unit = default(TUnit); return false; } // fetch task if (TryDequeue(out unit)) { WakeNewWorker(); return true; } //no tasks left, exit if the thread is no longer needed int runningThreads; bool exit = true; do { runningThreads = m_runningThreads; if (runningThreads <= m_minThreads) { exit = false; break; } } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); if (exit) { Interlocked.Decrement(ref m_runningThreads); return false; } // keep this thread and wait Interlocked.Increment(ref m_suspended); m_hasTasks.WaitOne(); Interlocked.Decrement(ref m_suspended); } while (true); } protected abstract void InvokeUnit(TUnit unit); void Worker() { TUnit unit; while (FetchTask(out unit)) InvokeUnit(unit); } protected virtual void Dispose(bool disposing) { if (disposing) { if (m_exitRequired == 0) { if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) return; // wake sleeping threads m_hasTasks.Set(); GC.SuppressFinalize(this); } } } public void Dispose() { Dispose(true); } ~DispatchPool() { Dispose(false); } } }