Mercurial > pub > ImplabNet
diff Implab/Parallels/WorkerPool.cs @ 12:eb418ba8275b promises
refactoring, added WorkerPool
author | cin |
---|---|
date | Tue, 05 Nov 2013 19:55:34 +0400 |
parents | |
children | b0feb5b9ad1c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/WorkerPool.cs Tue Nov 05 19:55:34 2013 +0400 @@ -0,0 +1,131 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public class WorkerPool : IDisposable { + readonly int m_minThreads; + readonly int m_maxThreads; + int m_runningThreads; + object m_lock = new object(); + + bool m_disposed = false; + ManualResetEvent m_hasTasks = new ManualResetEvent(false); + Queue<Action> m_queue = new Queue<Action>(); + + public WorkerPool(int min, int max) { + if (min < 0) + throw new ArgumentOutOfRangeException("min"); + if (min > max) + min = max; + m_minThreads = min; + m_maxThreads = max; + + for (int i = 0; i < m_minThreads; i++) + StartWorker(); + } + + public Promise<T> Invoke<T>(Func<T> task) { + if (m_disposed) + throw new ObjectDisposedException(ToString()); + if (task == null) + throw new ArgumentNullException("task"); + + var promise = new Promise<T>(); + + + + return promise; + } + + bool StartWorker() { + var current = m_runningThreads; + // use spins to allocate slot for the new thread + do { + if (current >= m_maxThreads) + // no more slots left + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + + // slot successfully allocated + + var worker = new Thread(this.Worker); + worker.Start(); + + return true; + } + + void EnqueueTask(Action task) { + Debug.Assert(task != null); + lock (m_queue) { + m_queue.Enqueue(task); + m_hasTasks.Set(); + } + } + + bool FetchTask(out Action task) { + task = null; + + while (true) { + + m_hasTasks.WaitOne(); + + if (m_disposed) + return false; + + lock (m_queue) { + if (m_queue.Count > 0) { + task = m_queue.Dequeue(); + return true; + } + + // no tasks left + // signal that no more tasks left, lock ensures that this event won't suppress newly added task + m_hasTasks.Reset(); + } + + bool exit = true; + + var current = m_runningThreads; + do { + if (current <= m_minThreads) { + exit = false; // this thread should return and wait for the new events + break; + } + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + + if (exit) + return false; + } + } + + void Worker() { + Action task; + while (FetchTask(out task)) + task(); + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + lock (m_lock) { + if (m_disposed) + return; + m_disposed = true; + } + m_hasTasks.Set(); + GC.SuppressFinalize(this); + } + } + + public void Dispose() { + Dispose(true); + } + + ~WorkerPool() { + Dispose(false); + } + } +}