Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 16:5a4b735ba669 promises
sync
author | cin |
---|---|
date | Thu, 07 Nov 2013 20:20:26 +0400 |
parents | 0f982f9b7d4d |
children | 7cd4a843b4e4 |
comparison
equal
deleted
inserted
replaced
15:0f982f9b7d4d | 16:5a4b735ba669 |
---|---|
8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
9 public class WorkerPool : DispatchPool<Action> { | 9 public class WorkerPool : DispatchPool<Action> { |
10 | 10 |
11 MTQueue<Action> m_queue = new MTQueue<Action>(); | 11 MTQueue<Action> m_queue = new MTQueue<Action>(); |
12 int m_queueLength = 0; | 12 int m_queueLength = 0; |
13 readonly int m_threshold = 1; | |
13 | 14 |
14 public WorkerPool(int minThreads, int maxThreads) | 15 public WorkerPool(int minThreads, int maxThreads, int threshold) |
15 : base(minThreads, maxThreads) { | 16 : base(minThreads, maxThreads) { |
16 InitPool(); | 17 m_threshold = threshold; |
18 InitPool(); | |
19 } | |
20 | |
21 public WorkerPool(int minThreads, int maxThreads) : | |
22 base(minThreads, maxThreads) { | |
23 InitPool(); | |
17 } | 24 } |
18 | 25 |
19 public WorkerPool(int threads) | 26 public WorkerPool(int threads) |
20 : base(threads) { | 27 : base(threads) { |
21 InitPool(); | 28 InitPool(); |
22 } | 29 } |
23 | 30 |
24 public WorkerPool() | 31 public WorkerPool() |
25 : base() { | 32 : base() { |
26 InitPool(); | 33 InitPool(); |
27 } | 34 } |
28 | 35 |
29 public Promise<T> Invoke<T>(Func<T> task) { | 36 public Promise<T> Invoke<T>(Func<T> task) { |
30 if (task == null) | 37 if (task == null) |
31 throw new ArgumentNullException("task"); | 38 throw new ArgumentNullException("task"); |
45 return promise; | 52 return promise; |
46 } | 53 } |
47 | 54 |
48 protected void EnqueueTask(Action unit) { | 55 protected void EnqueueTask(Action unit) { |
49 Debug.Assert(unit != null); | 56 Debug.Assert(unit != null); |
50 Interlocked.Increment(ref m_queueLength); | 57 var len = Interlocked.Increment(ref m_queueLength); |
51 m_queue.Enqueue(unit); | 58 m_queue.Enqueue(unit); |
52 // if there are sleeping threads in the pool wake one | 59 |
53 // probably this will lead a dry run | 60 if (ThreadCount == 0) |
54 WakeNewWorker(); | 61 // force to start |
62 WakeNewWorker(false); | |
63 } | |
64 | |
65 protected override void WakeNewWorker(bool extend) { | |
66 if (extend && m_queueLength <= m_threshold) | |
67 // in this case we are in active thread and it request for additional workers | |
68 // satisfy it only when queue is longer than threshold | |
69 return; | |
70 base.WakeNewWorker(extend); | |
55 } | 71 } |
56 | 72 |
57 protected override bool TryDequeue(out Action unit) { | 73 protected override bool TryDequeue(out Action unit) { |
58 if (m_queue.TryDequeue(out unit)) { | 74 if (m_queue.TryDequeue(out unit)) { |
59 Interlocked.Decrement(ref m_queueLength); | 75 Interlocked.Decrement(ref m_queueLength); |
63 } | 79 } |
64 | 80 |
65 protected override void InvokeUnit(Action unit) { | 81 protected override void InvokeUnit(Action unit) { |
66 unit(); | 82 unit(); |
67 } | 83 } |
84 | |
85 protected override void Suspend() { | |
86 if (m_queueLength == 0) | |
87 base.Suspend(); | |
88 } | |
68 } | 89 } |
69 } | 90 } |