12
|
1 using System;
|
|
2 using System.Threading;
|
|
3 using System.Diagnostics;
|
35
|
4 using Implab.Diagnostics;
|
12
|
5
|
|
6 namespace Implab.Parallels {
|
15
|
7 public class WorkerPool : DispatchPool<Action> {
|
12
|
8
|
15
|
9 MTQueue<Action> m_queue = new MTQueue<Action>();
|
|
10 int m_queueLength = 0;
|
16
|
11 readonly int m_threshold = 1;
|
13
|
12
|
16
|
13 public WorkerPool(int minThreads, int maxThreads, int threshold)
|
15
|
14 : base(minThreads, maxThreads) {
|
16
|
15 m_threshold = threshold;
|
|
16 InitPool();
|
|
17 }
|
|
18
|
|
19 public WorkerPool(int minThreads, int maxThreads) :
|
|
20 base(minThreads, maxThreads) {
|
|
21 InitPool();
|
13
|
22 }
|
|
23
|
15
|
24 public WorkerPool(int threads)
|
|
25 : base(threads) {
|
16
|
26 InitPool();
|
13
|
27 }
|
|
28
|
92
|
29 public WorkerPool() {
|
16
|
30 InitPool();
|
13
|
31 }
|
|
32
|
12
|
33 public Promise<T> Invoke<T>(Func<T> task) {
|
|
34 if (task == null)
|
|
35 throw new ArgumentNullException("task");
|
15
|
36 if (IsDisposed)
|
|
37 throw new ObjectDisposedException(ToString());
|
12
|
38
|
|
39 var promise = new Promise<T>();
|
|
40
|
92
|
41 var lop = TraceContext.Instance.CurrentOperation;
|
35
|
42
|
15
|
43 EnqueueTask(delegate() {
|
92
|
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
|
|
45 try {
|
|
46 promise.Resolve(task());
|
|
47 } catch (Exception e) {
|
|
48 promise.Reject(e);
|
|
49 } finally {
|
|
50 TraceContext.Instance.Leave();
|
|
51 }
|
13
|
52 });
|
12
|
53
|
|
54 return promise;
|
|
55 }
|
|
56
|
15
|
57 protected void EnqueueTask(Action unit) {
|
|
58 Debug.Assert(unit != null);
|
16
|
59 var len = Interlocked.Increment(ref m_queueLength);
|
15
|
60 m_queue.Enqueue(unit);
|
16
|
61
|
81
|
62 if (len > m_threshold * PoolSize) {
|
|
63 StartWorker();
|
80
|
64 }
|
81
|
65
|
|
66 SignalThread();
|
12
|
67 }
|
|
68
|
15
|
69 protected override bool TryDequeue(out Action unit) {
|
|
70 if (m_queue.TryDequeue(out unit)) {
|
|
71 Interlocked.Decrement(ref m_queueLength);
|
|
72 return true;
|
12
|
73 }
|
15
|
74 return false;
|
12
|
75 }
|
|
76
|
15
|
77 protected override void InvokeUnit(Action unit) {
|
|
78 unit();
|
12
|
79 }
|
16
|
80
|
12
|
81 }
|
|
82 }
|