annotate Implab/Parallels/WorkerPool.cs @ 205:8200ab154c8a v2

Added ResetState to RunnableComponent to reset in case of failure Added StateChanged event to IRunnable Renamed Promise.SUCCESS -> Promise.Success Added Promise.FromException Renamed Bundle -> PromiseAll in PromiseExtensions
author cin
date Tue, 25 Oct 2016 17:40:33 +0300
parents eb793fbbe4ea
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
1 using System;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
2 using System.Threading;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
3 using System.Diagnostics;
35
2880242f987a initial log capabilities
cin
parents: 34
diff changeset
4 using Implab.Diagnostics;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
5
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
6 namespace Implab.Parallels {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
7 public class WorkerPool : DispatchPool<Action> {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
8
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents: 92
diff changeset
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 119
diff changeset
10 int m_queueLength;
16
cin
parents: 15
diff changeset
11 readonly int m_threshold = 1;
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
12
16
cin
parents: 15
diff changeset
13 public WorkerPool(int minThreads, int maxThreads, int threshold)
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
14 : base(minThreads, maxThreads) {
16
cin
parents: 15
diff changeset
15 m_threshold = threshold;
cin
parents: 15
diff changeset
16 InitPool();
cin
parents: 15
diff changeset
17 }
cin
parents: 15
diff changeset
18
cin
parents: 15
diff changeset
19 public WorkerPool(int minThreads, int maxThreads) :
cin
parents: 15
diff changeset
20 base(minThreads, maxThreads) {
cin
parents: 15
diff changeset
21 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
22 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
23
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
24 public WorkerPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
25 : base(threads) {
16
cin
parents: 15
diff changeset
26 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
27 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
28
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
29 public WorkerPool() {
16
cin
parents: 15
diff changeset
30 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
31 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
32
149
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
33 public IPromise<T> Invoke<T>(Func<T> task) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
34 if (task == null)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
35 throw new ArgumentNullException("task");
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
36 if (IsDisposed)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
37 throw new ObjectDisposedException(ToString());
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
38
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
39 var promise = new FuncTask<T>(task, null, null, true);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
40
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
41 var lop = TraceContext.Instance.CurrentOperation;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
42
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
43 EnqueueTask(delegate {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
45
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
46 promise.Resolve();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
47
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
48 TraceContext.Instance.Leave();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
49 });
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
50
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
51 return promise;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
52 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
53
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
54 public IPromise Invoke(Action task) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
55 if (task == null)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
56 throw new ArgumentNullException("task");
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
57 if (IsDisposed)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
58 throw new ObjectDisposedException(ToString());
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
59
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
60 var promise = new ActionTask(task, null, null, true);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
61
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
62 var lop = TraceContext.Instance.CurrentOperation;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
63
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
64 EnqueueTask(delegate {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
65 TraceContext.Instance.EnterLogicalOperation(lop, false);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
66
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
67 promise.Resolve();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
68
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
69 TraceContext.Instance.Leave();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
70 });
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
71
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
72 return promise;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
73 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
74
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
76 if (task == null)
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
77 throw new ArgumentNullException("task");
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
78 if (IsDisposed)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
79 throw new ObjectDisposedException(ToString());
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
80
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
81 var promise = new Promise<T>();
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
82
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
83 var lop = TraceContext.Instance.CurrentOperation;
35
2880242f987a initial log capabilities
cin
parents: 34
diff changeset
84
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 119
diff changeset
85 EnqueueTask(delegate {
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
86 TraceContext.Instance.EnterLogicalOperation(lop, false);
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
87 try {
149
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
88 if (!promise.CancelOperationIfRequested())
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
89 promise.Resolve(task(promise));
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
90 } catch (Exception e) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
91 promise.Reject(e);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
92 } finally {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
93 TraceContext.Instance.Leave();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
94 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
95 });
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
96
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
97 return promise;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
98 }
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
99
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
100 public IPromise Invoke<T>(Action<ICancellationToken> task) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
101 if (task == null)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
102 throw new ArgumentNullException("task");
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
103 if (IsDisposed)
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
104 throw new ObjectDisposedException(ToString());
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
105
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
106 var promise = new Promise();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
107
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
108 var lop = TraceContext.Instance.CurrentOperation;
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
109
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
110 EnqueueTask(delegate {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
112 try {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
113 if (!promise.CancelOperationIfRequested()) {
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
114 task(promise);
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
115 promise.Resolve();
eb793fbbe4ea fixed promises cancellation
cin
parents: 129
diff changeset
116 }
92
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
117 } catch (Exception e) {
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
118 promise.Reject(e);
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
119 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
120 TraceContext.Instance.Leave();
4c0e5ef99986 rewritten tracing
cin
parents: 81
diff changeset
121 }
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
122 });
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
123
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
124 return promise;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
125 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
126
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
127 protected void EnqueueTask(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
128 Debug.Assert(unit != null);
16
cin
parents: 15
diff changeset
129 var len = Interlocked.Increment(ref m_queueLength);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
130 m_queue.Enqueue(unit);
16
cin
parents: 15
diff changeset
131
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
132 if (len > m_threshold * PoolSize) {
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
133 StartWorker();
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
134 }
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
135
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
136 SignalThread();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
137 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
138
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
139 protected override bool TryDequeue(out Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
140 if (m_queue.TryDequeue(out unit)) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
141 Interlocked.Decrement(ref m_queueLength);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
142 return true;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
143 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
144 return false;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
145 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
146
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
147 protected override void InvokeUnit(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
148 unit();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
149 }
16
cin
parents: 15
diff changeset
150
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
151 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
152 }