Mercurial > pub > ImplabNet
view Implab/Parallels/WorkerPool.cs @ 14:e943453e5039 promises
Implemented interllocked queue
fixed promise syncronization
author | cin |
---|---|
date | Wed, 06 Nov 2013 17:49:12 +0400 |
parents | b0feb5b9ad1c |
children | 0f982f9b7d4d |
line wrap: on
line source
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Diagnostics; namespace Implab.Parallels { public class WorkerPool : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; int m_runningThreads; object m_lock = new object(); bool m_disposed = false; // this event will signal that workers can try to fetch a task from queue or the pool has been disposed ManualResetEvent m_hasTasks = new ManualResetEvent(false); Queue<Action> m_queue = new Queue<Action>(); public WorkerPool(int min, int max) { if (min < 0) throw new ArgumentOutOfRangeException("min"); if (max <= 0) throw new ArgumentOutOfRangeException("max"); if (min > max) min = max; m_minThreads = min; m_maxThreads = max; InitPool(); } public WorkerPool(int max) : this(0, max) { } public WorkerPool() { int maxThreads, maxCP; ThreadPool.GetMaxThreads(out maxThreads, out maxCP); m_minThreads = 0; m_maxThreads = maxThreads; InitPool(); } void InitPool() { for (int i = 0; i < m_minThreads; i++) StartWorker(); } public int ThreadCount { get { return m_runningThreads; } } public Promise<T> Invoke<T>(Func<T> task) { if (m_disposed) throw new ObjectDisposedException(ToString()); if (task == null) throw new ArgumentNullException("task"); var promise = new Promise<T>(); var queueLen = EnqueueTask(delegate() { try { promise.Resolve(task()); } catch (Exception e) { promise.Reject(e); } }); if (queueLen > 1) StartWorker(); return promise; } bool StartWorker() { var current = m_runningThreads; // use spins to allocate slot for the new thread do { if (current >= m_maxThreads) // no more slots left return false; } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); // slot successfully allocated var worker = new Thread(this.Worker); worker.IsBackground = true; worker.Start(); return true; } int EnqueueTask(Action task) { Debug.Assert(task != null); lock (m_queue) { m_queue.Enqueue(task); m_hasTasks.Set(); return m_queue.Count; } } bool FetchTask(out Action task) { task = null; while (true) { m_hasTasks.WaitOne(); if (m_disposed) return false; lock (m_queue) { if (m_queue.Count > 0) { task = m_queue.Dequeue(); return true; } // no tasks left // signal that no more tasks left, current lock ensures that this event won't suppress newly added task m_hasTasks.Reset(); } bool exit = true; var current = m_runningThreads; do { if (current <= m_minThreads) { exit = false; // this thread should return and wait for the new events break; } } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); if (exit) return false; } } void Worker() { Action task; while (FetchTask(out task)) task(); } protected virtual void Dispose(bool disposing) { if (disposing) { lock (m_lock) { if (m_disposed) return; m_disposed = true; } m_hasTasks.Set(); GC.SuppressFinalize(this); } } public void Dispose() { Dispose(true); } ~WorkerPool() { Dispose(false); } } }