annotate Implab/Parallels/DispatchPool.cs @ 196:40d7fed4a09e

fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author cin
date Mon, 29 Aug 2016 23:15:51 +0300
parents f803565868a4
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
1 using System;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
2 using System.Threading;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
3
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
4 namespace Implab.Parallels {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
5 public abstract class DispatchPool<TUnit> : IDisposable {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
6 readonly int m_minThreadsLimit;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
7 readonly int m_maxThreadsLimit;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
9
93
dc4942d09e74 improved tracing
cin
parents: 92
diff changeset
10 int m_threads; // the current size of the pool
dc4942d09e74 improved tracing
cin
parents: 92
diff changeset
11 int m_maxRunningThreads; // the meximum reached size of the pool
dc4942d09e74 improved tracing
cin
parents: 92
diff changeset
12 int m_exit; // the pool is going to shutdown, all unused workers are released
80
4f20870d0816 added memory barriers
cin
parents: 41
diff changeset
13
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
14 readonly object m_signal = new object(); // used to pulse waiting threads
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
16 protected DispatchPool(int min, int max) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
17 if (min < 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
18 throw new ArgumentOutOfRangeException("min");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
19 if (max <= 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
20 throw new ArgumentOutOfRangeException("max");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
21
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
22 if (min > max)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
23 min = max;
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
24 m_minThreadsLimit = min;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
25 m_maxThreadsLimit = max;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
26 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
27
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
28 protected DispatchPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
29 : this(threads, threads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
30 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
31
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
32 protected DispatchPool() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
33
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
34 m_minThreadsLimit = 0;
125
f803565868a4 improved performance of promises
cin
parents: 93
diff changeset
35 m_maxThreadsLimit = Environment.ProcessorCount;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
36 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
37
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
38 protected void InitPool() {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
39 for (int i = 0; i < m_minThreadsLimit; i++)
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
40 StartWorker();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
41 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
42
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
43 public int PoolSize {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
44 get {
80
4f20870d0816 added memory barriers
cin
parents: 41
diff changeset
45 Thread.MemoryBarrier();
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
46 return m_threads;
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
47 }
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
48 }
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
49
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
50 public int MaxRunningThreads {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
51 get {
80
4f20870d0816 added memory barriers
cin
parents: 41
diff changeset
52 Thread.MemoryBarrier();
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
53 return m_maxRunningThreads;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
54 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
55 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
56
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
57 protected bool IsDisposed {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
58 get {
80
4f20870d0816 added memory barriers
cin
parents: 41
diff changeset
59 Thread.MemoryBarrier();
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
60 return m_exit == 1;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
61 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
62 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
63
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
64 protected abstract bool TryDequeue(out TUnit unit);
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
65
89
ce0171cacec4 improved performance of a chained map operation
cin
parents: 81
diff changeset
66 bool Dequeue(out TUnit unit, int timeout) {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
67 int ts = Environment.TickCount;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
68 if (TryDequeue(out unit))
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
69 return true;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
70 lock (m_signal) {
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
71 while (!TryDequeue(out unit) && m_exit == 0)
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
72 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
73 // timeout
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
74 return false;
80
4f20870d0816 added memory barriers
cin
parents: 41
diff changeset
75 }
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
76 // queue item or terminate
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
77 Monitor.Pulse(m_signal);
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
78 if (m_exit == 1)
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
79 return false;
80
4f20870d0816 added memory barriers
cin
parents: 41
diff changeset
80 }
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
81 return true;
22
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
82 }
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
83
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
84 protected void SignalThread() {
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
85 lock (m_signal) {
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
86 Monitor.Pulse(m_signal);
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
87 }
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
88 }
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
89
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
90 #region thread slots traits
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
91
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
92 bool AllocateThreadSlot() {
16
cin
parents: 15
diff changeset
93 int current;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
94 // use spins to allocate slot for the new thread
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
95 do {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
96 current = m_threads;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
97 if (current >= m_maxThreadsLimit || m_exit == 1)
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
98 // no more slots left or the pool has been disposed
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
99 return false;
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
100 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
101
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
102 UpdateMaxThreads(current + 1);
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
103
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
104 return true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
105 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
106
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
107 bool AllocateThreadSlot(int desired) {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
108 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
109 return false;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
110
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
111 UpdateMaxThreads(desired);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
112
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
113 return true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
114 }
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
115
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
116 bool ReleaseThreadSlot(out bool last) {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
117 last = false;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
118 int current;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
119 // use spins to release slot for the new thread
80
4f20870d0816 added memory barriers
cin
parents: 41
diff changeset
120 Thread.MemoryBarrier();
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
121 do {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
122 current = m_threads;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
123 if (current <= m_minThreadsLimit && m_exit == 0)
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
124 // the thread is reserved
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
125 return false;
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
126 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
127
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
128 last = (current == 1);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
129
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
130 return true;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
131 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
132
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
133 void UpdateMaxThreads(int count) {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
134 int max;
16
cin
parents: 15
diff changeset
135 do {
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
136 max = m_maxRunningThreads;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
137 if (max >= count)
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
138 break;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
139 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
16
cin
parents: 15
diff changeset
140 }
cin
parents: 15
diff changeset
141
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
142 #endregion
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
143
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
144 protected bool StartWorker() {
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
145 if (AllocateThreadSlot()) {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
146 // slot successfully allocated
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
147 var worker = new Thread(Worker);
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
148 worker.IsBackground = true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
149 worker.Start();
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
150
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
151 return true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
152 }
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
153 return false;
16
cin
parents: 15
diff changeset
154 }
cin
parents: 15
diff changeset
155
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
156 protected abstract void InvokeUnit(TUnit unit);
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
157
41
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 34
diff changeset
158 protected virtual void Worker() {
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
159 TUnit unit;
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
160 bool last;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
161 do {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
162 while (Dequeue(out unit, m_releaseTimeout)) {
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
163 InvokeUnit(unit);
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
164 }
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
165 if(!ReleaseThreadSlot(out last))
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
166 continue;
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
167 // queue may be not empty
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
168 if (last && TryDequeue(out unit)) {
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
169 InvokeUnit(unit);
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
170 if (AllocateThreadSlot(1))
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
171 continue;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
172 // we can safely exit since pool is alive
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
173 }
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
174 break;
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
175 } while(true);
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
176 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
177
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
178
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
179 protected virtual void Dispose(bool disposing) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
180 if (disposing) {
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
181 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
182 // wake sleeping threads
81
2c5631b43c7d dispatch pool rewritten
cin
parents: 80
diff changeset
183 SignalThread();
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
184 GC.SuppressFinalize(this);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
185 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
186 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
187 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
188
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
189 public void Dispose() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
190 Dispose(true);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
191 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
192
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
193 ~DispatchPool() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
194 Dispose(false);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
195 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
196 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
197 }