Mercurial > pub > ImplabNet
view Implab/Parallels/WorkerPool.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | fe33f4e02ad5 |
children | 2c5631b43c7d |
line wrap: on
line source
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Diagnostics; using Implab.Diagnostics; namespace Implab.Parallels { public class WorkerPool : DispatchPool<Action> { MTQueue<Action> m_queue = new MTQueue<Action>(); int m_queueLength = 0; readonly int m_threshold = 1; int m_workers = 0; public WorkerPool(int minThreads, int maxThreads, int threshold) : base(minThreads, maxThreads) { m_threshold = threshold; m_workers = minThreads; InitPool(); } public WorkerPool(int minThreads, int maxThreads) : base(minThreads, maxThreads) { m_workers = minThreads; InitPool(); } public WorkerPool(int threads) : base(threads) { m_workers = threads; InitPool(); } public WorkerPool() : base() { InitPool(); } public Promise<T> Invoke<T>(Func<T> task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) throw new ObjectDisposedException(ToString()); var promise = new Promise<T>(); var caller = TraceContext.Snapshot(); EnqueueTask(delegate() { caller.Invoke(delegate() { try { promise.Resolve(task()); } catch (Exception e) { promise.Reject(e); } }); }); return promise; } protected void EnqueueTask(Action unit) { Debug.Assert(unit != null); var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); if (len > m_threshold * m_workers) { Interlocked.Increment(ref m_workers); GrowPool(); } } protected override bool TryDequeue(out Action unit) { if (m_queue.TryDequeue(out unit)) { Interlocked.Decrement(ref m_queueLength); return true; } return false; } protected override bool Suspend() { // This override solves race condition // WORKER CLIENT // --------------------------------------- // TryDeque == false // Enqueue(unit), queueLen++ // GrowPool? == NO // ActiveThreads-- // Suspend // queueLength > 0 // continue Thread.MemoryBarrier(); if (m_queueLength > 0) return true; Interlocked.Decrement(ref m_workers); return base.Suspend(); } protected override void InvokeUnit(Action unit) { unit(); } } }