Mercurial > pub > ImplabNet
view Implab/Parallels/WorkerPool.cs @ 12:eb418ba8275b promises
refactoring, added WorkerPool
author | cin |
---|---|
date | Tue, 05 Nov 2013 19:55:34 +0400 |
parents | |
children | b0feb5b9ad1c |
line wrap: on
line source
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Diagnostics; namespace Implab.Parallels { public class WorkerPool : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; int m_runningThreads; object m_lock = new object(); bool m_disposed = false; ManualResetEvent m_hasTasks = new ManualResetEvent(false); Queue<Action> m_queue = new Queue<Action>(); public WorkerPool(int min, int max) { if (min < 0) throw new ArgumentOutOfRangeException("min"); if (min > max) min = max; m_minThreads = min; m_maxThreads = max; for (int i = 0; i < m_minThreads; i++) StartWorker(); } public Promise<T> Invoke<T>(Func<T> task) { if (m_disposed) throw new ObjectDisposedException(ToString()); if (task == null) throw new ArgumentNullException("task"); var promise = new Promise<T>(); return promise; } bool StartWorker() { var current = m_runningThreads; // use spins to allocate slot for the new thread do { if (current >= m_maxThreads) // no more slots left return false; } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); // slot successfully allocated var worker = new Thread(this.Worker); worker.Start(); return true; } void EnqueueTask(Action task) { Debug.Assert(task != null); lock (m_queue) { m_queue.Enqueue(task); m_hasTasks.Set(); } } bool FetchTask(out Action task) { task = null; while (true) { m_hasTasks.WaitOne(); if (m_disposed) return false; lock (m_queue) { if (m_queue.Count > 0) { task = m_queue.Dequeue(); return true; } // no tasks left // signal that no more tasks left, lock ensures that this event won't suppress newly added task m_hasTasks.Reset(); } bool exit = true; var current = m_runningThreads; do { if (current <= m_minThreads) { exit = false; // this thread should return and wait for the new events break; } } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); if (exit) return false; } } void Worker() { Action task; while (FetchTask(out task)) task(); } protected virtual void Dispose(bool disposing) { if (disposing) { lock (m_lock) { if (m_disposed) return; m_disposed = true; } m_hasTasks.Set(); GC.SuppressFinalize(this); } } public void Dispose() { Dispose(true); } ~WorkerPool() { Dispose(false); } } }