Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 33:b255e4aeef17
removed the reference to the parent from the promise object this allows
resolved promises to release parents and results they are holding.
Added complete set of operations to IPromiseBase interface
Subscribing to the cancellation event of the promise should not affect it's
IsExclusive property
More tests.
author | cin |
---|---|
date | Thu, 10 Apr 2014 02:39:29 +0400 |
parents | 2fad2d1f4b03 |
children | dabf79fde388 |
rev | line source |
---|---|
15 | 1 using System; |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
9 public abstract class DispatchPool<TUnit> : IDisposable { | |
10 readonly int m_minThreads; | |
11 readonly int m_maxThreads; | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
12 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
13 int m_createdThreads = 0; // the current size of the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
14 int m_activeThreads = 0; // the count of threads which are active |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
15 int m_sleepingThreads = 0; // the count of currently inactive threads |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
19 int m_wakeEvents = 0; // the count of wake events |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
20 |
15 | 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false); |
22 | |
23 protected DispatchPool(int min, int max) { | |
24 if (min < 0) | |
25 throw new ArgumentOutOfRangeException("min"); | |
26 if (max <= 0) | |
27 throw new ArgumentOutOfRangeException("max"); | |
28 | |
29 if (min > max) | |
30 min = max; | |
31 m_minThreads = min; | |
32 m_maxThreads = max; | |
33 } | |
34 | |
35 protected DispatchPool(int threads) | |
36 : this(threads, threads) { | |
37 } | |
38 | |
39 protected DispatchPool() { | |
40 int maxThreads, maxCP; | |
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
42 | |
43 m_minThreads = 0; | |
44 m_maxThreads = maxThreads; | |
45 } | |
46 | |
47 protected void InitPool() { | |
48 for (int i = 0; i < m_minThreads; i++) | |
49 StartWorker(); | |
50 } | |
51 | |
20 | 52 public int PoolSize { |
15 | 53 get { |
20 | 54 return m_createdThreads; |
55 } | |
56 } | |
57 | |
58 public int ActiveThreads { | |
59 get { | |
60 return m_activeThreads; | |
15 | 61 } |
62 } | |
63 | |
64 public int MaxRunningThreads { | |
65 get { | |
66 return m_maxRunningThreads; | |
67 } | |
68 } | |
69 | |
70 protected bool IsDisposed { | |
71 get { | |
72 return m_exitRequired != 0; | |
73 } | |
74 } | |
75 | |
17 | 76 protected abstract bool TryDequeue(out TUnit unit); |
77 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
78 #region thread execution traits |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
79 int SignalThread() { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
80 var signals = Interlocked.Increment(ref m_wakeEvents); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
81 if(signals == 1) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
82 m_hasTasks.Set(); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
83 return signals; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
84 } |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
85 |
22 | 86 bool FetchSignalOrWait(int timeout) { |
87 var start = Environment.TickCount; | |
88 | |
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен | |
90 // ее вернуть, чтобы другой ожидающий поток смог | |
91 bool hasLock = false; | |
92 do { | |
93 int signals; | |
94 do { | |
95 signals = m_wakeEvents; | |
96 if (signals == 0) | |
97 break; | |
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | |
99 | |
100 if (signals >= 1) { | |
101 if (signals > 1 && hasLock) | |
102 m_hasTasks.Set(); | |
103 return true; | |
104 } | |
105 | |
106 if (timeout != -1) | |
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | |
108 | |
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | |
110 // и уйдет на пустой цикл, после чего заблокируется | |
111 | |
112 hasLock = true; | |
113 } while (m_hasTasks.WaitOne(timeout)); | |
114 | |
115 return false; | |
116 } | |
117 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
118 bool Sleep(int timeout) { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
119 Interlocked.Increment(ref m_sleepingThreads); |
22 | 120 if (FetchSignalOrWait(timeout)) { |
121 Interlocked.Decrement(ref m_sleepingThreads); | |
17 | 122 return true; |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
123 } else { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
124 Interlocked.Decrement(ref m_sleepingThreads); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
125 return false; |
20 | 126 } |
17 | 127 } |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
128 #endregion |
17 | 129 |
130 /// <summary> | |
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |
132 /// </summary> | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
133 protected void GrowPool() { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
134 if (m_exitRequired != 0) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
135 return; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
136 if (m_sleepingThreads > m_wakeEvents) { |
22 | 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); |
138 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
139 // all sleeping threads may gone |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
140 SignalThread(); // wake a sleeping thread; |
17 | 141 |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
142 // we can't check whether signal has been processed |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
143 // anyway it may take some time for the thread to start |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
144 // we will ensure that at least one thread is running |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
145 |
30 | 146 EnsurePoolIsAlive(); |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
147 } else { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
148 // if there is no sleeping threads in the pool |
30 | 149 if (!StartWorker()) { |
150 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue | |
24 | 151 // send it a signal to spin again |
30 | 152 SignalThread(); |
153 EnsurePoolIsAlive(); | |
154 } | |
155 } | |
156 } | |
157 | |
158 private void EnsurePoolIsAlive() { | |
159 if (AllocateThreadSlot(1)) { | |
160 // if there were no threads in the pool | |
161 var worker = new Thread(this.Worker); | |
162 worker.IsBackground = true; | |
163 worker.Start(); | |
17 | 164 } |
165 } | |
166 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
167 private bool Suspend() { |
20 | 168 //no tasks left, exit if the thread is no longer needed |
169 bool last; | |
170 bool requestExit; | |
171 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
172 // if threads have a timeout before releasing |
20 | 173 if (m_releaseTimeout > 0) |
174 requestExit = !Sleep(m_releaseTimeout); | |
175 else | |
176 requestExit = true; | |
177 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
178 if (!requestExit) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
179 return true; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
180 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
181 // release unsused thread |
20 | 182 if (requestExit && ReleaseThreadSlot(out last)) { |
183 // in case at the moment the last thread was being released | |
184 // a new task was added to the queue, we need to try | |
185 // to revoke the thread to avoid the situation when the task is left unprocessed | |
30 | 186 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false |
187 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it | |
188 return AllocateThreadSlot(1); // ensure that at least one thread is alive | |
20 | 189 } |
190 | |
191 return false; | |
192 } | |
193 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
194 // wait till infinity |
20 | 195 Sleep(-1); |
196 | |
197 return true; | |
17 | 198 } |
199 | |
200 #region thread slots traits | |
201 | |
202 bool AllocateThreadSlot() { | |
16 | 203 int current; |
15 | 204 // use spins to allocate slot for the new thread |
205 do { | |
20 | 206 current = m_createdThreads; |
15 | 207 if (current >= m_maxThreads || m_exitRequired != 0) |
208 // no more slots left or the pool has been disposed | |
209 return false; | |
20 | 210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
15 | 211 |
17 | 212 UpdateMaxThreads(current + 1); |
213 | |
214 return true; | |
215 } | |
15 | 216 |
17 | 217 bool AllocateThreadSlot(int desired) { |
20 | 218 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) |
17 | 219 return false; |
220 | |
221 UpdateMaxThreads(desired); | |
15 | 222 |
17 | 223 return true; |
224 } | |
225 | |
226 bool ReleaseThreadSlot(out bool last) { | |
227 last = false; | |
228 int current; | |
229 // use spins to release slot for the new thread | |
230 do { | |
20 | 231 current = m_createdThreads; |
17 | 232 if (current <= m_minThreads && m_exitRequired == 0) |
233 // the thread is reserved | |
234 return false; | |
20 | 235 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); |
17 | 236 |
237 last = (current == 1); | |
15 | 238 |
239 return true; | |
240 } | |
241 | |
17 | 242 /// <summary> |
243 /// releases thread slot unconditionally, used during cleanup | |
244 /// </summary> | |
245 /// <returns>true - no more threads left</returns> | |
246 bool ReleaseThreadSlotAnyway() { | |
20 | 247 var left = Interlocked.Decrement(ref m_createdThreads); |
17 | 248 return left == 0; |
15 | 249 } |
250 | |
17 | 251 void UpdateMaxThreads(int count) { |
252 int max; | |
16 | 253 do { |
17 | 254 max = m_maxRunningThreads; |
255 if (max >= count) | |
256 break; | |
257 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
16 | 258 } |
259 | |
17 | 260 #endregion |
261 | |
262 bool StartWorker() { | |
263 if (AllocateThreadSlot()) { | |
264 // slot successfully allocated | |
265 var worker = new Thread(this.Worker); | |
266 worker.IsBackground = true; | |
267 worker.Start(); | |
268 | |
269 return true; | |
270 } else { | |
271 return false; | |
272 } | |
16 | 273 } |
274 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
275 protected abstract void InvokeUnit(TUnit unit); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
276 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
277 void Worker() { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
278 TUnit unit; |
22 | 279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
280 Interlocked.Increment(ref m_activeThreads); |
15 | 281 do { |
282 // exit if requested | |
283 if (m_exitRequired != 0) { | |
284 // release the thread slot | |
20 | 285 Interlocked.Decrement(ref m_activeThreads); |
17 | 286 if (ReleaseThreadSlotAnyway()) // it was the last worker |
15 | 287 m_hasTasks.Dispose(); |
288 else | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
289 SignalThread(); // wake next worker |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
290 break; |
15 | 291 } |
292 | |
293 // fetch task | |
294 if (TryDequeue(out unit)) { | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
295 InvokeUnit(unit); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
296 continue; |
15 | 297 } |
298 | |
20 | 299 Interlocked.Decrement(ref m_activeThreads); |
15 | 300 |
16 | 301 // entering suspend state |
302 // keep this thread and wait | |
20 | 303 if (!Suspend()) |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
304 break; |
22 | 305 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
20 | 306 Interlocked.Increment(ref m_activeThreads); |
15 | 307 } while (true); |
22 | 308 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
15 | 309 } |
310 | |
311 protected virtual void Dispose(bool disposing) { | |
312 if (disposing) { | |
313 if (m_exitRequired == 0) { | |
314 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | |
315 return; | |
316 | |
317 // wake sleeping threads | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
318 if (m_createdThreads > 0) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
319 SignalThread(); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
320 else |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
321 m_hasTasks.Dispose(); |
15 | 322 GC.SuppressFinalize(this); |
323 } | |
324 } | |
325 } | |
326 | |
327 public void Dispose() { | |
328 Dispose(true); | |
329 } | |
330 | |
331 ~DispatchPool() { | |
332 Dispose(false); | |
333 } | |
334 } | |
335 } |