12
|
1 using System;
|
|
2 using System.Collections.Generic;
|
|
3 using System.Linq;
|
|
4 using System.Text;
|
|
5 using System.Threading;
|
|
6 using System.Diagnostics;
|
|
7
|
|
8 namespace Implab.Parallels {
|
15
|
9 public class WorkerPool : DispatchPool<Action> {
|
12
|
10
|
15
|
11 MTQueue<Action> m_queue = new MTQueue<Action>();
|
|
12 int m_queueLength = 0;
|
16
|
13 readonly int m_threshold = 1;
|
13
|
14
|
16
|
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
|
15
|
16 : base(minThreads, maxThreads) {
|
16
|
17 m_threshold = threshold;
|
|
18 InitPool();
|
|
19 }
|
|
20
|
|
21 public WorkerPool(int minThreads, int maxThreads) :
|
|
22 base(minThreads, maxThreads) {
|
|
23 InitPool();
|
13
|
24 }
|
|
25
|
15
|
26 public WorkerPool(int threads)
|
|
27 : base(threads) {
|
16
|
28 InitPool();
|
13
|
29 }
|
|
30
|
15
|
31 public WorkerPool()
|
|
32 : base() {
|
16
|
33 InitPool();
|
13
|
34 }
|
|
35
|
12
|
36 public Promise<T> Invoke<T>(Func<T> task) {
|
|
37 if (task == null)
|
|
38 throw new ArgumentNullException("task");
|
15
|
39 if (IsDisposed)
|
|
40 throw new ObjectDisposedException(ToString());
|
12
|
41
|
|
42 var promise = new Promise<T>();
|
|
43
|
15
|
44 EnqueueTask(delegate() {
|
13
|
45 try {
|
|
46 promise.Resolve(task());
|
|
47 } catch (Exception e) {
|
|
48 promise.Reject(e);
|
|
49 }
|
|
50 });
|
12
|
51
|
|
52 return promise;
|
|
53 }
|
|
54
|
15
|
55 protected void EnqueueTask(Action unit) {
|
|
56 Debug.Assert(unit != null);
|
16
|
57 var len = Interlocked.Increment(ref m_queueLength);
|
15
|
58 m_queue.Enqueue(unit);
|
16
|
59
|
17
|
60 if(!ExtendPool())
|
|
61 WakePool();
|
16
|
62 }
|
|
63
|
17
|
64 protected override bool ExtendPool() {
|
|
65 if (m_queueLength <= m_threshold*ThreadCount)
|
16
|
66 // in this case we are in active thread and it request for additional workers
|
|
67 // satisfy it only when queue is longer than threshold
|
17
|
68 return false;
|
|
69 return base.ExtendPool();
|
12
|
70 }
|
|
71
|
15
|
72 protected override bool TryDequeue(out Action unit) {
|
|
73 if (m_queue.TryDequeue(out unit)) {
|
|
74 Interlocked.Decrement(ref m_queueLength);
|
|
75 return true;
|
12
|
76 }
|
15
|
77 return false;
|
12
|
78 }
|
|
79
|
15
|
80 protected override void InvokeUnit(Action unit) {
|
|
81 unit();
|
12
|
82 }
|
16
|
83
|
|
84 protected override void Suspend() {
|
|
85 if (m_queueLength == 0)
|
|
86 base.Suspend();
|
|
87 }
|
12
|
88 }
|
|
89 }
|