12
|
1 using System;
|
|
2 using System.Collections.Generic;
|
|
3 using System.Linq;
|
|
4 using System.Text;
|
|
5 using System.Threading;
|
|
6 using System.Diagnostics;
|
35
|
7 using Implab.Diagnostics;
|
12
|
8
|
|
9 namespace Implab.Parallels {
|
15
|
10 public class WorkerPool : DispatchPool<Action> {
|
12
|
11
|
15
|
12 MTQueue<Action> m_queue = new MTQueue<Action>();
|
|
13 int m_queueLength = 0;
|
16
|
14 readonly int m_threshold = 1;
|
80
|
15 int m_workers = 0;
|
13
|
16
|
16
|
17 public WorkerPool(int minThreads, int maxThreads, int threshold)
|
15
|
18 : base(minThreads, maxThreads) {
|
16
|
19 m_threshold = threshold;
|
80
|
20 m_workers = minThreads;
|
16
|
21 InitPool();
|
|
22 }
|
|
23
|
|
24 public WorkerPool(int minThreads, int maxThreads) :
|
|
25 base(minThreads, maxThreads) {
|
80
|
26 m_workers = minThreads;
|
16
|
27 InitPool();
|
13
|
28 }
|
|
29
|
15
|
30 public WorkerPool(int threads)
|
|
31 : base(threads) {
|
80
|
32 m_workers = threads;
|
16
|
33 InitPool();
|
13
|
34 }
|
|
35
|
15
|
36 public WorkerPool()
|
|
37 : base() {
|
16
|
38 InitPool();
|
13
|
39 }
|
|
40
|
12
|
41 public Promise<T> Invoke<T>(Func<T> task) {
|
|
42 if (task == null)
|
|
43 throw new ArgumentNullException("task");
|
15
|
44 if (IsDisposed)
|
|
45 throw new ObjectDisposedException(ToString());
|
12
|
46
|
|
47 var promise = new Promise<T>();
|
|
48
|
40
|
49 var caller = TraceContext.Snapshot();
|
35
|
50
|
15
|
51 EnqueueTask(delegate() {
|
40
|
52 caller.Invoke(delegate() {
|
|
53 try {
|
|
54 promise.Resolve(task());
|
|
55 } catch (Exception e) {
|
|
56 promise.Reject(e);
|
|
57 }
|
|
58 });
|
13
|
59 });
|
12
|
60
|
|
61 return promise;
|
|
62 }
|
|
63
|
15
|
64 protected void EnqueueTask(Action unit) {
|
|
65 Debug.Assert(unit != null);
|
16
|
66 var len = Interlocked.Increment(ref m_queueLength);
|
15
|
67 m_queue.Enqueue(unit);
|
16
|
68
|
81
|
69 if (len > m_threshold * PoolSize) {
|
|
70 StartWorker();
|
80
|
71 }
|
81
|
72
|
|
73 SignalThread();
|
12
|
74 }
|
|
75
|
15
|
76 protected override bool TryDequeue(out Action unit) {
|
|
77 if (m_queue.TryDequeue(out unit)) {
|
|
78 Interlocked.Decrement(ref m_queueLength);
|
|
79 return true;
|
12
|
80 }
|
15
|
81 return false;
|
12
|
82 }
|
|
83
|
15
|
84 protected override void InvokeUnit(Action unit) {
|
|
85 unit();
|
12
|
86 }
|
16
|
87
|
12
|
88 }
|
|
89 }
|