Mercurial > pub > ImplabNet
diff Implab/Parallels/WorkerPool.cs @ 192:f1da3afc3521 release v2.1
Слияние с v2
author | cin |
---|---|
date | Fri, 22 Apr 2016 13:10:34 +0300 |
parents | eb793fbbe4ea |
children |
line wrap: on
line diff
--- a/Implab/Parallels/WorkerPool.cs Wed Sep 03 18:34:02 2014 +0400 +++ b/Implab/Parallels/WorkerPool.cs Fri Apr 22 13:10:34 2016 +0300 @@ -1,7 +1,4 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; using System.Threading; using System.Diagnostics; using Implab.Diagnostics; @@ -9,8 +6,8 @@ namespace Implab.Parallels { public class WorkerPool : DispatchPool<Action> { - MTQueue<Action> m_queue = new MTQueue<Action>(); - int m_queueLength = 0; + AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); + int m_queueLength; readonly int m_threshold = 1; public WorkerPool(int minThreads, int maxThreads, int threshold) @@ -29,12 +26,53 @@ InitPool(); } - public WorkerPool() - : base() { + public WorkerPool() { InitPool(); } - public Promise<T> Invoke<T>(Func<T> task) { + public IPromise<T> Invoke<T>(Func<T> task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new FuncTask<T>(task, null, null, true); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + + promise.Resolve(); + + TraceContext.Instance.Leave(); + }); + + return promise; + } + + public IPromise Invoke(Action task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new ActionTask(task, null, null, true); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + + promise.Resolve(); + + TraceContext.Instance.Leave(); + }); + + return promise; + } + + public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) @@ -42,16 +80,45 @@ var promise = new Promise<T>(); - var caller = TraceContext.Snapshot(); + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + try { + if (!promise.CancelOperationIfRequested()) + promise.Resolve(task(promise)); + } catch (Exception e) { + promise.Reject(e); + } finally { + TraceContext.Instance.Leave(); + } + }); + + return promise; + } - EnqueueTask(delegate() { - caller.Invoke(delegate() { - try { - promise.Resolve(task()); - } catch (Exception e) { - promise.Reject(e); + public IPromise Invoke<T>(Action<ICancellationToken> task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new Promise(); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + try { + if (!promise.CancelOperationIfRequested()) { + task(promise); + promise.Resolve(); } - }); + } catch (Exception e) { + promise.Reject(e); + } finally { + TraceContext.Instance.Leave(); + } }); return promise; @@ -62,8 +129,11 @@ var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - if (len > m_threshold*ActiveThreads) - GrowPool(); + if (len > m_threshold * PoolSize) { + StartWorker(); + } + + SignalThread(); } protected override bool TryDequeue(out Action unit) { @@ -74,22 +144,6 @@ return false; } - protected override bool Suspend() { - // This override solves race condition - // WORKER CLIENT - // --------------------------------------- - // TryDeque == false - // Enqueue(unit), queueLen++ - // GrowPool? == NO - // ActiveThreads-- - // Suspend - // queueLength > 0 - // continue - if (m_queueLength > 0) - return true; - return base.Suspend(); - } - protected override void InvokeUnit(Action unit) { unit(); }