Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 16:5a4b735ba669 promises
sync
| author | cin |
|---|---|
| date | Thu, 07 Nov 2013 20:20:26 +0400 |
| parents | 0f982f9b7d4d |
| children | 7cd4a843b4e4 |
comparison
equal
deleted
inserted
replaced
| 15:0f982f9b7d4d | 16:5a4b735ba669 |
|---|---|
| 8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
| 9 public class WorkerPool : DispatchPool<Action> { | 9 public class WorkerPool : DispatchPool<Action> { |
| 10 | 10 |
| 11 MTQueue<Action> m_queue = new MTQueue<Action>(); | 11 MTQueue<Action> m_queue = new MTQueue<Action>(); |
| 12 int m_queueLength = 0; | 12 int m_queueLength = 0; |
| 13 readonly int m_threshold = 1; | |
| 13 | 14 |
| 14 public WorkerPool(int minThreads, int maxThreads) | 15 public WorkerPool(int minThreads, int maxThreads, int threshold) |
| 15 : base(minThreads, maxThreads) { | 16 : base(minThreads, maxThreads) { |
| 16 InitPool(); | 17 m_threshold = threshold; |
| 18 InitPool(); | |
| 19 } | |
| 20 | |
| 21 public WorkerPool(int minThreads, int maxThreads) : | |
| 22 base(minThreads, maxThreads) { | |
| 23 InitPool(); | |
| 17 } | 24 } |
| 18 | 25 |
| 19 public WorkerPool(int threads) | 26 public WorkerPool(int threads) |
| 20 : base(threads) { | 27 : base(threads) { |
| 21 InitPool(); | 28 InitPool(); |
| 22 } | 29 } |
| 23 | 30 |
| 24 public WorkerPool() | 31 public WorkerPool() |
| 25 : base() { | 32 : base() { |
| 26 InitPool(); | 33 InitPool(); |
| 27 } | 34 } |
| 28 | 35 |
| 29 public Promise<T> Invoke<T>(Func<T> task) { | 36 public Promise<T> Invoke<T>(Func<T> task) { |
| 30 if (task == null) | 37 if (task == null) |
| 31 throw new ArgumentNullException("task"); | 38 throw new ArgumentNullException("task"); |
| 45 return promise; | 52 return promise; |
| 46 } | 53 } |
| 47 | 54 |
| 48 protected void EnqueueTask(Action unit) { | 55 protected void EnqueueTask(Action unit) { |
| 49 Debug.Assert(unit != null); | 56 Debug.Assert(unit != null); |
| 50 Interlocked.Increment(ref m_queueLength); | 57 var len = Interlocked.Increment(ref m_queueLength); |
| 51 m_queue.Enqueue(unit); | 58 m_queue.Enqueue(unit); |
| 52 // if there are sleeping threads in the pool wake one | 59 |
| 53 // probably this will lead a dry run | 60 if (ThreadCount == 0) |
| 54 WakeNewWorker(); | 61 // force to start |
| 62 WakeNewWorker(false); | |
| 63 } | |
| 64 | |
| 65 protected override void WakeNewWorker(bool extend) { | |
| 66 if (extend && m_queueLength <= m_threshold) | |
| 67 // in this case we are in active thread and it request for additional workers | |
| 68 // satisfy it only when queue is longer than threshold | |
| 69 return; | |
| 70 base.WakeNewWorker(extend); | |
| 55 } | 71 } |
| 56 | 72 |
| 57 protected override bool TryDequeue(out Action unit) { | 73 protected override bool TryDequeue(out Action unit) { |
| 58 if (m_queue.TryDequeue(out unit)) { | 74 if (m_queue.TryDequeue(out unit)) { |
| 59 Interlocked.Decrement(ref m_queueLength); | 75 Interlocked.Decrement(ref m_queueLength); |
| 63 } | 79 } |
| 64 | 80 |
| 65 protected override void InvokeUnit(Action unit) { | 81 protected override void InvokeUnit(Action unit) { |
| 66 unit(); | 82 unit(); |
| 67 } | 83 } |
| 84 | |
| 85 protected override void Suspend() { | |
| 86 if (m_queueLength == 0) | |
| 87 base.Suspend(); | |
| 88 } | |
| 68 } | 89 } |
| 69 } | 90 } |
