Mercurial > pub > ImplabNet
view Implab/Parallels/WorkerPool.cs @ 218:babe55c34931 v2
Trace logical operations in the correct channel
author | cin |
---|---|
date | Thu, 04 May 2017 02:17:53 +0300 |
parents | eb793fbbe4ea |
children |
line wrap: on
line source
using System; using System.Threading; using System.Diagnostics; using Implab.Diagnostics; namespace Implab.Parallels { public class WorkerPool : DispatchPool<Action> { AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); int m_queueLength; readonly int m_threshold = 1; public WorkerPool(int minThreads, int maxThreads, int threshold) : base(minThreads, maxThreads) { m_threshold = threshold; InitPool(); } public WorkerPool(int minThreads, int maxThreads) : base(minThreads, maxThreads) { InitPool(); } public WorkerPool(int threads) : base(threads) { InitPool(); } public WorkerPool() { InitPool(); } public IPromise<T> Invoke<T>(Func<T> task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) throw new ObjectDisposedException(ToString()); var promise = new FuncTask<T>(task, null, null, true); var lop = TraceContext.Instance.CurrentOperation; EnqueueTask(delegate { TraceContext.Instance.EnterLogicalOperation(lop, false); promise.Resolve(); TraceContext.Instance.Leave(); }); return promise; } public IPromise Invoke(Action task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) throw new ObjectDisposedException(ToString()); var promise = new ActionTask(task, null, null, true); var lop = TraceContext.Instance.CurrentOperation; EnqueueTask(delegate { TraceContext.Instance.EnterLogicalOperation(lop, false); promise.Resolve(); TraceContext.Instance.Leave(); }); return promise; } public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) throw new ObjectDisposedException(ToString()); var promise = new Promise<T>(); var lop = TraceContext.Instance.CurrentOperation; EnqueueTask(delegate { TraceContext.Instance.EnterLogicalOperation(lop, false); try { if (!promise.CancelOperationIfRequested()) promise.Resolve(task(promise)); } catch (Exception e) { promise.Reject(e); } finally { TraceContext.Instance.Leave(); } }); return promise; } public IPromise Invoke<T>(Action<ICancellationToken> task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) throw new ObjectDisposedException(ToString()); var promise = new Promise(); var lop = TraceContext.Instance.CurrentOperation; EnqueueTask(delegate { TraceContext.Instance.EnterLogicalOperation(lop, false); try { if (!promise.CancelOperationIfRequested()) { task(promise); promise.Resolve(); } } catch (Exception e) { promise.Reject(e); } finally { TraceContext.Instance.Leave(); } }); return promise; } protected void EnqueueTask(Action unit) { Debug.Assert(unit != null); var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); if (len > m_threshold * PoolSize) { StartWorker(); } SignalThread(); } protected override bool TryDequeue(out Action unit) { if (m_queue.TryDequeue(out unit)) { Interlocked.Decrement(ref m_queueLength); return true; } return false; } protected override void InvokeUnit(Action unit) { unit(); } } }