annotate Implab/Parallels/WorkerPool.cs @ 196:40d7fed4a09e

fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author cin
date Mon, 29 Aug 2016 23:15:51 +0300
parents eb793fbbe4ea
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
1 using System;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
2 using System.Threading;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
3 using System.Diagnostics;
35
2880242f987a initial log capabilities
cin
parents: 34
diff changeset
4 using Implab.Diagnostics;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
5
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
6 namespace Implab.Parallels {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
7 public class WorkerPool : DispatchPool<Action> {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
8
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents: 92
diff changeset
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 119
diff changeset
10 int m_queueLength;
16
cin
parents: 15
diff changeset
11 readonly int m_threshold = 1;
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
12
16
cin
parents: 15
diff changeset
13 public WorkerPool(int minThreads, int maxThreads, int threshold)
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
14 : base(minThreads, maxThreads) {
16
cin
parents: 15
diff changeset
15 m_threshold = threshold;
cin
parents: 15
diff changeset
16 InitPool();
cin
parents: 15
diff changeset
17 }
cin
parents: 15
diff changeset
18
cin
parents: 15
diff changeset
19 public WorkerPool(int minThreads, int maxThreads) :
cin
parents: 15
diff changeset
20 base(minThreads, maxThreads) {
cin
parents: 15
diff changeset
21 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
22 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
23
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
24 public WorkerPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
25 : base(threads) {
16
cin
parents: 15
diff changeset
26 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
27 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
28
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
29 public WorkerPool() {
16
cin
parents: 15
diff changeset
30 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
31 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
32
149
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
33 public IPromise<T> Invoke<T>(Func<T> task) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
34 if (task == null)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
35 throw new ArgumentNullException("task");
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
36 if (IsDisposed)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
37 throw new ObjectDisposedException(ToString());
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
38
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
39 var promise = new FuncTask<T>(task, null, null, true);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
40
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
41 var lop = TraceContext.Instance.CurrentOperation;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
42
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
43 EnqueueTask(delegate {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
45
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
46 promise.Resolve();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
47
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
48 TraceContext.Instance.Leave();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
49 });
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
50
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
51 return promise;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
52 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
53
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
54 public IPromise Invoke(Action task) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
55 if (task == null)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
56 throw new ArgumentNullException("task");
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
57 if (IsDisposed)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
58 throw new ObjectDisposedException(ToString());
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
59
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
60 var promise = new ActionTask(task, null, null, true);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
61
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
62 var lop = TraceContext.Instance.CurrentOperation;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
63
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
64 EnqueueTask(delegate {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
65 TraceContext.Instance.EnterLogicalOperation(lop, false);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
66
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
67 promise.Resolve();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
68
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
69 TraceContext.Instance.Leave();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
70 });
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
71
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
72 return promise;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
73 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
74
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
76 if (task == null)
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
77 throw new ArgumentNullException("task");
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
78 if (IsDisposed)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
79 throw new ObjectDisposedException(ToString());
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
80
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
81 var promise = new Promise<T>();
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
82
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
83 var lop = TraceContext.Instance.CurrentOperation;
35
2880242f987a initial log capabilities
cin
parents: 34
diff changeset
84
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 119
diff changeset
85 EnqueueTask(delegate {
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
86 TraceContext.Instance.EnterLogicalOperation(lop, false);
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
87 try {
149
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
88 if (!promise.CancelOperationIfRequested())
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
89 promise.Resolve(task(promise));
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
90 } catch (Exception e) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
91 promise.Reject(e);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
92 } finally {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
93 TraceContext.Instance.Leave();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
94 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
95 });
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
96
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
97 return promise;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
98 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
99
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
100 public IPromise Invoke<T>(Action<ICancellationToken> task) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
101 if (task == null)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
102 throw new ArgumentNullException("task");
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
103 if (IsDisposed)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
104 throw new ObjectDisposedException(ToString());
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
105
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
106 var promise = new Promise();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
107
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
108 var lop = TraceContext.Instance.CurrentOperation;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
109
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
110 EnqueueTask(delegate {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
112 try {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
113 if (!promise.CancelOperationIfRequested()) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
114 task(promise);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
115 promise.Resolve();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
116 }
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
117 } catch (Exception e) {
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
118 promise.Reject(e);
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
119 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
120 TraceContext.Instance.Leave();
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
121 }
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
122 });
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
123
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
124 return promise;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
125 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
126
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
127 protected void EnqueueTask(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
128 Debug.Assert(unit != null);
16
cin
parents: 15
diff changeset
129 var len = Interlocked.Increment(ref m_queueLength);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
130 m_queue.Enqueue(unit);
16
cin
parents: 15
diff changeset
131
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
132 if (len > m_threshold * PoolSize) {
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
133 StartWorker();
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
134 }
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
135
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
136 SignalThread();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
137 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
138
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
139 protected override bool TryDequeue(out Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
140 if (m_queue.TryDequeue(out unit)) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
141 Interlocked.Decrement(ref m_queueLength);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
142 return true;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
143 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
144 return false;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
145 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
146
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
147 protected override void InvokeUnit(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
148 unit();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
149 }
16
cin
parents: 15
diff changeset
150
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
151 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
152 }