Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 22:5a35900264f5 promises
implemented nonblocking wake singnals processing
author | cin |
---|---|
date | Wed, 13 Nov 2013 14:03:00 +0400 |
parents | 6a56df4ec59e |
children | ee04e1fa78da |
rev | line source |
---|---|
15 | 1 using System; |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
9 public abstract class DispatchPool<TUnit> : IDisposable { | |
10 readonly int m_minThreads; | |
11 readonly int m_maxThreads; | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
12 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
13 int m_createdThreads = 0; // the current size of the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
14 int m_activeThreads = 0; // the count of threads which are active |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
15 int m_sleepingThreads = 0; // the count of currently inactive threads |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
19 int m_wakeEvents = 0; // the count of wake events |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
20 |
15 | 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false); |
22 | |
23 protected DispatchPool(int min, int max) { | |
24 if (min < 0) | |
25 throw new ArgumentOutOfRangeException("min"); | |
26 if (max <= 0) | |
27 throw new ArgumentOutOfRangeException("max"); | |
28 | |
29 if (min > max) | |
30 min = max; | |
31 m_minThreads = min; | |
32 m_maxThreads = max; | |
33 } | |
34 | |
35 protected DispatchPool(int threads) | |
36 : this(threads, threads) { | |
37 } | |
38 | |
39 protected DispatchPool() { | |
40 int maxThreads, maxCP; | |
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
42 | |
43 m_minThreads = 0; | |
44 m_maxThreads = maxThreads; | |
45 } | |
46 | |
47 protected void InitPool() { | |
48 for (int i = 0; i < m_minThreads; i++) | |
49 StartWorker(); | |
50 } | |
51 | |
20 | 52 public int PoolSize { |
15 | 53 get { |
20 | 54 return m_createdThreads; |
55 } | |
56 } | |
57 | |
58 public int ActiveThreads { | |
59 get { | |
60 return m_activeThreads; | |
15 | 61 } |
62 } | |
63 | |
64 public int MaxRunningThreads { | |
65 get { | |
66 return m_maxRunningThreads; | |
67 } | |
68 } | |
69 | |
70 protected bool IsDisposed { | |
71 get { | |
72 return m_exitRequired != 0; | |
73 } | |
74 } | |
75 | |
17 | 76 protected abstract bool TryDequeue(out TUnit unit); |
77 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
78 #region thread execution traits |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
79 int SignalThread() { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
80 var signals = Interlocked.Increment(ref m_wakeEvents); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
81 if(signals == 1) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
82 m_hasTasks.Set(); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
83 return signals; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
84 } |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
85 |
22 | 86 bool FetchSignalOrWait(int timeout) { |
87 var start = Environment.TickCount; | |
88 | |
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен | |
90 // ее вернуть, чтобы другой ожидающий поток смог | |
91 bool hasLock = false; | |
92 do { | |
93 int signals; | |
94 do { | |
95 signals = m_wakeEvents; | |
96 if (signals == 0) | |
97 break; | |
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | |
99 | |
100 if (signals >= 1) { | |
101 if (signals > 1 && hasLock) | |
102 m_hasTasks.Set(); | |
103 return true; | |
104 } | |
105 | |
106 if (timeout != -1) | |
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | |
108 | |
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | |
110 // и уйдет на пустой цикл, после чего заблокируется | |
111 | |
112 hasLock = true; | |
113 } while (m_hasTasks.WaitOne(timeout)); | |
114 | |
115 return false; | |
116 } | |
117 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
118 bool Sleep(int timeout) { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
119 Interlocked.Increment(ref m_sleepingThreads); |
22 | 120 if (FetchSignalOrWait(timeout)) { |
121 Interlocked.Decrement(ref m_sleepingThreads); | |
17 | 122 return true; |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
123 } else { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
124 Interlocked.Decrement(ref m_sleepingThreads); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
125 return false; |
20 | 126 } |
17 | 127 } |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
128 #endregion |
17 | 129 |
130 /// <summary> | |
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |
132 /// </summary> | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
133 protected void GrowPool() { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
134 if (m_exitRequired != 0) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
135 return; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
136 if (m_sleepingThreads > m_wakeEvents) { |
22 | 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); |
138 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
139 // all sleeping threads may gone |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
140 SignalThread(); // wake a sleeping thread; |
17 | 141 |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
142 // we can't check whether signal has been processed |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
143 // anyway it may take some time for the thread to start |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
144 // we will ensure that at least one thread is running |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
145 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
146 if (AllocateThreadSlot(1)) { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
147 // if there were no threads in the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
148 var worker = new Thread(this.Worker); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
149 worker.IsBackground = true; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
150 worker.Start(); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
151 } |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
152 } else { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
153 // if there is no sleeping threads in the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
154 StartWorker(); |
17 | 155 } |
156 } | |
157 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
158 private bool Suspend() { |
20 | 159 //no tasks left, exit if the thread is no longer needed |
160 bool last; | |
161 bool requestExit; | |
162 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
163 // if threads have a timeout before releasing |
20 | 164 if (m_releaseTimeout > 0) |
165 requestExit = !Sleep(m_releaseTimeout); | |
166 else | |
167 requestExit = true; | |
168 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
169 if (!requestExit) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
170 return true; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
171 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
172 // release unsused thread |
20 | 173 if (requestExit && ReleaseThreadSlot(out last)) { |
174 // in case at the moment the last thread was being released | |
175 // a new task was added to the queue, we need to try | |
176 // to revoke the thread to avoid the situation when the task is left unprocessed | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
177 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false |
20 | 178 if (AllocateThreadSlot(1)) |
179 return true; // spin again... | |
180 else | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
181 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
182 |
20 | 183 } |
184 | |
185 return false; | |
186 } | |
187 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
188 // wait till infinity |
20 | 189 Sleep(-1); |
190 | |
191 return true; | |
17 | 192 } |
193 | |
194 #region thread slots traits | |
195 | |
196 bool AllocateThreadSlot() { | |
16 | 197 int current; |
15 | 198 // use spins to allocate slot for the new thread |
199 do { | |
20 | 200 current = m_createdThreads; |
15 | 201 if (current >= m_maxThreads || m_exitRequired != 0) |
202 // no more slots left or the pool has been disposed | |
203 return false; | |
20 | 204 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
15 | 205 |
17 | 206 UpdateMaxThreads(current + 1); |
207 | |
208 return true; | |
209 } | |
15 | 210 |
17 | 211 bool AllocateThreadSlot(int desired) { |
20 | 212 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) |
17 | 213 return false; |
214 | |
215 UpdateMaxThreads(desired); | |
15 | 216 |
17 | 217 return true; |
218 } | |
219 | |
220 bool ReleaseThreadSlot(out bool last) { | |
221 last = false; | |
222 int current; | |
223 // use spins to release slot for the new thread | |
224 do { | |
20 | 225 current = m_createdThreads; |
17 | 226 if (current <= m_minThreads && m_exitRequired == 0) |
227 // the thread is reserved | |
228 return false; | |
20 | 229 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); |
17 | 230 |
231 last = (current == 1); | |
15 | 232 |
233 return true; | |
234 } | |
235 | |
17 | 236 /// <summary> |
237 /// releases thread slot unconditionally, used during cleanup | |
238 /// </summary> | |
239 /// <returns>true - no more threads left</returns> | |
240 bool ReleaseThreadSlotAnyway() { | |
20 | 241 var left = Interlocked.Decrement(ref m_createdThreads); |
17 | 242 return left == 0; |
15 | 243 } |
244 | |
17 | 245 void UpdateMaxThreads(int count) { |
246 int max; | |
16 | 247 do { |
17 | 248 max = m_maxRunningThreads; |
249 if (max >= count) | |
250 break; | |
251 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
16 | 252 } |
253 | |
17 | 254 #endregion |
255 | |
256 bool StartWorker() { | |
257 if (AllocateThreadSlot()) { | |
258 // slot successfully allocated | |
259 var worker = new Thread(this.Worker); | |
260 worker.IsBackground = true; | |
261 worker.Start(); | |
262 | |
263 return true; | |
264 } else { | |
265 return false; | |
266 } | |
16 | 267 } |
268 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
269 protected abstract void InvokeUnit(TUnit unit); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
270 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
271 void Worker() { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
272 TUnit unit; |
22 | 273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
274 Interlocked.Increment(ref m_activeThreads); |
15 | 275 do { |
276 // exit if requested | |
277 if (m_exitRequired != 0) { | |
278 // release the thread slot | |
20 | 279 Interlocked.Decrement(ref m_activeThreads); |
17 | 280 if (ReleaseThreadSlotAnyway()) // it was the last worker |
15 | 281 m_hasTasks.Dispose(); |
282 else | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
283 SignalThread(); // wake next worker |
15 | 284 unit = default(TUnit); |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
285 break; |
15 | 286 } |
287 | |
288 // fetch task | |
289 if (TryDequeue(out unit)) { | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
290 InvokeUnit(unit); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
291 continue; |
15 | 292 } |
293 | |
20 | 294 Interlocked.Decrement(ref m_activeThreads); |
15 | 295 |
16 | 296 // entering suspend state |
297 // keep this thread and wait | |
20 | 298 if (!Suspend()) |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
299 break; |
22 | 300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
20 | 301 Interlocked.Increment(ref m_activeThreads); |
15 | 302 } while (true); |
22 | 303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
15 | 304 } |
305 | |
306 protected virtual void Dispose(bool disposing) { | |
307 if (disposing) { | |
308 if (m_exitRequired == 0) { | |
309 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | |
310 return; | |
311 | |
312 // wake sleeping threads | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
313 if (m_createdThreads > 0) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
314 SignalThread(); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
315 else |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
316 m_hasTasks.Dispose(); |
15 | 317 GC.SuppressFinalize(this); |
318 } | |
319 } | |
320 } | |
321 | |
322 public void Dispose() { | |
323 Dispose(true); | |
324 } | |
325 | |
326 ~DispatchPool() { | |
327 Dispose(false); | |
328 } | |
329 } | |
330 } |