Mercurial > pub > ImplabNet
diff Implab/Parallels/WorkerPool.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | b0feb5b9ad1c |
children | 5a4b735ba669 |
line wrap: on
line diff
--- a/Implab/Parallels/WorkerPool.cs Wed Nov 06 17:49:12 2013 +0400 +++ b/Implab/Parallels/WorkerPool.cs Thu Nov 07 03:41:32 2013 +0400 @@ -6,66 +6,35 @@ using System.Diagnostics; namespace Implab.Parallels { - public class WorkerPool : IDisposable { - readonly int m_minThreads; - readonly int m_maxThreads; - int m_runningThreads; - object m_lock = new object(); - - bool m_disposed = false; - - // this event will signal that workers can try to fetch a task from queue or the pool has been disposed - ManualResetEvent m_hasTasks = new ManualResetEvent(false); - Queue<Action> m_queue = new Queue<Action>(); + public class WorkerPool : DispatchPool<Action> { - public WorkerPool(int min, int max) { - if (min < 0) - throw new ArgumentOutOfRangeException("min"); - if (max <= 0) - throw new ArgumentOutOfRangeException("max"); + MTQueue<Action> m_queue = new MTQueue<Action>(); + int m_queueLength = 0; - if (min > max) - min = max; - m_minThreads = min; - m_maxThreads = max; - - InitPool(); + public WorkerPool(int minThreads, int maxThreads) + : base(minThreads, maxThreads) { + InitPool(); } - public WorkerPool(int max) - : this(0, max) { + public WorkerPool(int threads) + : base(threads) { + InitPool(); } - public WorkerPool() { - int maxThreads, maxCP; - ThreadPool.GetMaxThreads(out maxThreads, out maxCP); - - m_minThreads = 0; - m_maxThreads = maxThreads; - - InitPool(); - } - - void InitPool() { - for (int i = 0; i < m_minThreads; i++) - StartWorker(); - } - - public int ThreadCount { - get { - return m_runningThreads; - } + public WorkerPool() + : base() { + InitPool(); } public Promise<T> Invoke<T>(Func<T> task) { - if (m_disposed) - throw new ObjectDisposedException(ToString()); if (task == null) throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); var promise = new Promise<T>(); - var queueLen = EnqueueTask(delegate() { + EnqueueTask(delegate() { try { promise.Resolve(task()); } catch (Exception e) { @@ -73,99 +42,28 @@ } }); - if (queueLen > 1) - StartWorker(); - return promise; } - bool StartWorker() { - var current = m_runningThreads; - // use spins to allocate slot for the new thread - do { - if (current >= m_maxThreads) - // no more slots left - return false; - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); - - // slot successfully allocated - - var worker = new Thread(this.Worker); - worker.IsBackground = true; - worker.Start(); - - return true; - } - - int EnqueueTask(Action task) { - Debug.Assert(task != null); - lock (m_queue) { - m_queue.Enqueue(task); - m_hasTasks.Set(); - return m_queue.Count; - } + protected void EnqueueTask(Action unit) { + Debug.Assert(unit != null); + Interlocked.Increment(ref m_queueLength); + m_queue.Enqueue(unit); + // if there are sleeping threads in the pool wake one + // probably this will lead a dry run + WakeNewWorker(); } - bool FetchTask(out Action task) { - task = null; - - while (true) { - - m_hasTasks.WaitOne(); - - if (m_disposed) - return false; - - lock (m_queue) { - if (m_queue.Count > 0) { - task = m_queue.Dequeue(); - return true; - } - - // no tasks left - // signal that no more tasks left, current lock ensures that this event won't suppress newly added task - m_hasTasks.Reset(); - } - - bool exit = true; - - var current = m_runningThreads; - do { - if (current <= m_minThreads) { - exit = false; // this thread should return and wait for the new events - break; - } - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); - - if (exit) - return false; + protected override bool TryDequeue(out Action unit) { + if (m_queue.TryDequeue(out unit)) { + Interlocked.Decrement(ref m_queueLength); + return true; } + return false; } - void Worker() { - Action task; - while (FetchTask(out task)) - task(); - } - - protected virtual void Dispose(bool disposing) { - if (disposing) { - lock (m_lock) { - if (m_disposed) - return; - m_disposed = true; - } - m_hasTasks.Set(); - GC.SuppressFinalize(this); - } - } - - public void Dispose() { - Dispose(true); - } - - ~WorkerPool() { - Dispose(false); + protected override void InvokeUnit(Action unit) { + unit(); } } }