comparison Implab/Parallels/WorkerPool.cs @ 16:5a4b735ba669 promises

sync
author cin
date Thu, 07 Nov 2013 20:20:26 +0400
parents 0f982f9b7d4d
children 7cd4a843b4e4
comparison
equal deleted inserted replaced
15:0f982f9b7d4d 16:5a4b735ba669
8 namespace Implab.Parallels { 8 namespace Implab.Parallels {
9 public class WorkerPool : DispatchPool<Action> { 9 public class WorkerPool : DispatchPool<Action> {
10 10
11 MTQueue<Action> m_queue = new MTQueue<Action>(); 11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0; 12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
13 14
14 public WorkerPool(int minThreads, int maxThreads) 15 public WorkerPool(int minThreads, int maxThreads, int threshold)
15 : base(minThreads, maxThreads) { 16 : base(minThreads, maxThreads) {
16 InitPool(); 17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
23 InitPool();
17 } 24 }
18 25
19 public WorkerPool(int threads) 26 public WorkerPool(int threads)
20 : base(threads) { 27 : base(threads) {
21 InitPool(); 28 InitPool();
22 } 29 }
23 30
24 public WorkerPool() 31 public WorkerPool()
25 : base() { 32 : base() {
26 InitPool(); 33 InitPool();
27 } 34 }
28 35
29 public Promise<T> Invoke<T>(Func<T> task) { 36 public Promise<T> Invoke<T>(Func<T> task) {
30 if (task == null) 37 if (task == null)
31 throw new ArgumentNullException("task"); 38 throw new ArgumentNullException("task");
45 return promise; 52 return promise;
46 } 53 }
47 54
48 protected void EnqueueTask(Action unit) { 55 protected void EnqueueTask(Action unit) {
49 Debug.Assert(unit != null); 56 Debug.Assert(unit != null);
50 Interlocked.Increment(ref m_queueLength); 57 var len = Interlocked.Increment(ref m_queueLength);
51 m_queue.Enqueue(unit); 58 m_queue.Enqueue(unit);
52 // if there are sleeping threads in the pool wake one 59
53 // probably this will lead a dry run 60 if (ThreadCount == 0)
54 WakeNewWorker(); 61 // force to start
62 WakeNewWorker(false);
63 }
64
65 protected override void WakeNewWorker(bool extend) {
66 if (extend && m_queueLength <= m_threshold)
67 // in this case we are in active thread and it request for additional workers
68 // satisfy it only when queue is longer than threshold
69 return;
70 base.WakeNewWorker(extend);
55 } 71 }
56 72
57 protected override bool TryDequeue(out Action unit) { 73 protected override bool TryDequeue(out Action unit) {
58 if (m_queue.TryDequeue(out unit)) { 74 if (m_queue.TryDequeue(out unit)) {
59 Interlocked.Decrement(ref m_queueLength); 75 Interlocked.Decrement(ref m_queueLength);
63 } 79 }
64 80
65 protected override void InvokeUnit(Action unit) { 81 protected override void InvokeUnit(Action unit) {
66 unit(); 82 unit();
67 } 83 }
84
85 protected override void Suspend() {
86 if (m_queueLength == 0)
87 base.Suspend();
88 }
68 } 89 }
69 } 90 }