Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | 2fc0fbe7d58b |
children | 2c5631b43c7d |
rev | line source |
---|---|
15 | 1 using System; |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
9 public abstract class DispatchPool<TUnit> : IDisposable { | |
10 readonly int m_minThreads; | |
11 readonly int m_maxThreads; | |
80 | 12 readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
13 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
14 int m_createdThreads = 0; // the current size of the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
15 int m_activeThreads = 0; // the count of threads which are active |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
16 int m_sleepingThreads = 0; // the count of currently inactive threads |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
17 int m_maxRunningThreads = 0; // the meximum reached size of the pool |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
18 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released |
80 | 19 |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
20 int m_wakeEvents = 0; // the count of wake events |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
21 |
80 | 22 readonly object m_signalLocker = new object(); |
15 | 23 |
24 protected DispatchPool(int min, int max) { | |
25 if (min < 0) | |
26 throw new ArgumentOutOfRangeException("min"); | |
27 if (max <= 0) | |
28 throw new ArgumentOutOfRangeException("max"); | |
29 | |
30 if (min > max) | |
31 min = max; | |
32 m_minThreads = min; | |
33 m_maxThreads = max; | |
34 } | |
35 | |
36 protected DispatchPool(int threads) | |
37 : this(threads, threads) { | |
38 } | |
39 | |
40 protected DispatchPool() { | |
41 int maxThreads, maxCP; | |
42 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
43 | |
44 m_minThreads = 0; | |
45 m_maxThreads = maxThreads; | |
46 } | |
47 | |
48 protected void InitPool() { | |
49 for (int i = 0; i < m_minThreads; i++) | |
50 StartWorker(); | |
51 } | |
52 | |
20 | 53 public int PoolSize { |
15 | 54 get { |
80 | 55 Thread.MemoryBarrier(); |
20 | 56 return m_createdThreads; |
57 } | |
58 } | |
59 | |
60 public int ActiveThreads { | |
61 get { | |
80 | 62 Thread.MemoryBarrier(); |
20 | 63 return m_activeThreads; |
15 | 64 } |
65 } | |
66 | |
67 public int MaxRunningThreads { | |
68 get { | |
80 | 69 Thread.MemoryBarrier(); |
15 | 70 return m_maxRunningThreads; |
71 } | |
72 } | |
73 | |
74 protected bool IsDisposed { | |
75 get { | |
80 | 76 Thread.MemoryBarrier(); |
77 return m_exitRequired == 1; | |
15 | 78 } |
79 } | |
80 | |
17 | 81 protected abstract bool TryDequeue(out TUnit unit); |
82 | |
80 | 83 #region thread signaling traits |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
84 int SignalThread() { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
85 var signals = Interlocked.Increment(ref m_wakeEvents); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
86 if(signals == 1) |
80 | 87 lock(m_signalLocker) |
88 Monitor.Pulse(m_signalLocker); | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
89 return signals; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
90 } |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
91 |
22 | 92 bool FetchSignalOrWait(int timeout) { |
93 var start = Environment.TickCount; | |
80 | 94 int signals; |
95 Thread.MemoryBarrier(); // m_wakeEvents volatile first read | |
22 | 96 do { |
80 | 97 signals = m_wakeEvents; |
98 if (signals == 0) | |
99 break; | |
100 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | |
22 | 101 |
80 | 102 if (signals == 0) { |
103 // no signal is fetched | |
104 lock(m_signalLocker) { | |
105 while(m_wakeEvents == 0) { | |
106 if (timeout != -1) | |
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | |
108 if(!Monitor.Wait(m_signalLocker,timeout)) | |
109 return false; // timeout | |
110 } | |
111 // m_wakeEvents > 0 | |
112 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized | |
113 Monitor.Pulse(m_signalLocker); | |
114 | |
115 // signal fetched | |
22 | 116 return true; |
117 } | |
118 | |
80 | 119 } else { |
120 // signal fetched | |
121 return true; | |
122 } | |
22 | 123 |
124 | |
125 } | |
126 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
127 bool Sleep(int timeout) { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
128 Interlocked.Increment(ref m_sleepingThreads); |
22 | 129 if (FetchSignalOrWait(timeout)) { |
130 Interlocked.Decrement(ref m_sleepingThreads); | |
17 | 131 return true; |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
132 } else { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
133 Interlocked.Decrement(ref m_sleepingThreads); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
134 return false; |
20 | 135 } |
17 | 136 } |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
137 #endregion |
17 | 138 |
139 /// <summary> | |
140 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |
141 /// </summary> | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
142 protected void GrowPool() { |
80 | 143 Thread.MemoryBarrier(); |
144 if (m_exitRequired == 1) | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
145 return; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
146 if (m_sleepingThreads > m_wakeEvents) { |
22 | 147 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); |
148 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
149 // all sleeping threads may gone |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
150 SignalThread(); // wake a sleeping thread; |
17 | 151 |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
152 // we can't check whether signal has been processed |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
153 // anyway it may take some time for the thread to start |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
154 // we will ensure that at least one thread is running |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
155 |
30 | 156 EnsurePoolIsAlive(); |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
157 } else { |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
158 // if there is no sleeping threads in the pool |
30 | 159 if (!StartWorker()) { |
160 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue | |
24 | 161 // send it a signal to spin again |
30 | 162 SignalThread(); |
163 EnsurePoolIsAlive(); | |
164 } | |
165 } | |
166 } | |
167 | |
34 | 168 protected void EnsurePoolIsAlive() { |
30 | 169 if (AllocateThreadSlot(1)) { |
170 // if there were no threads in the pool | |
171 var worker = new Thread(this.Worker); | |
172 worker.IsBackground = true; | |
173 worker.Start(); | |
17 | 174 } |
175 } | |
176 | |
34 | 177 protected virtual bool Suspend() { |
20 | 178 //no tasks left, exit if the thread is no longer needed |
179 bool last; | |
180 bool requestExit; | |
181 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
182 // if threads have a timeout before releasing |
20 | 183 if (m_releaseTimeout > 0) |
184 requestExit = !Sleep(m_releaseTimeout); | |
185 else | |
186 requestExit = true; | |
187 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
188 if (!requestExit) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
189 return true; |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
190 |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
191 // release unsused thread |
20 | 192 if (requestExit && ReleaseThreadSlot(out last)) { |
193 // in case at the moment the last thread was being released | |
194 // a new task was added to the queue, we need to try | |
195 // to revoke the thread to avoid the situation when the task is left unprocessed | |
30 | 196 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false |
197 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it | |
198 return AllocateThreadSlot(1); // ensure that at least one thread is alive | |
20 | 199 } |
200 | |
201 return false; | |
202 } | |
203 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
204 // wait till infinity |
20 | 205 Sleep(-1); |
206 | |
207 return true; | |
17 | 208 } |
209 | |
210 #region thread slots traits | |
211 | |
212 bool AllocateThreadSlot() { | |
16 | 213 int current; |
15 | 214 // use spins to allocate slot for the new thread |
215 do { | |
20 | 216 current = m_createdThreads; |
80 | 217 if (current >= m_maxThreads || m_exitRequired == 1) |
15 | 218 // no more slots left or the pool has been disposed |
219 return false; | |
20 | 220 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
15 | 221 |
17 | 222 UpdateMaxThreads(current + 1); |
223 | |
224 return true; | |
225 } | |
15 | 226 |
17 | 227 bool AllocateThreadSlot(int desired) { |
20 | 228 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) |
17 | 229 return false; |
230 | |
231 UpdateMaxThreads(desired); | |
15 | 232 |
17 | 233 return true; |
234 } | |
235 | |
236 bool ReleaseThreadSlot(out bool last) { | |
237 last = false; | |
238 int current; | |
239 // use spins to release slot for the new thread | |
80 | 240 Thread.MemoryBarrier(); |
17 | 241 do { |
20 | 242 current = m_createdThreads; |
17 | 243 if (current <= m_minThreads && m_exitRequired == 0) |
244 // the thread is reserved | |
245 return false; | |
20 | 246 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); |
17 | 247 |
248 last = (current == 1); | |
15 | 249 |
250 return true; | |
251 } | |
252 | |
17 | 253 /// <summary> |
254 /// releases thread slot unconditionally, used during cleanup | |
255 /// </summary> | |
256 /// <returns>true - no more threads left</returns> | |
257 bool ReleaseThreadSlotAnyway() { | |
20 | 258 var left = Interlocked.Decrement(ref m_createdThreads); |
17 | 259 return left == 0; |
15 | 260 } |
261 | |
17 | 262 void UpdateMaxThreads(int count) { |
263 int max; | |
16 | 264 do { |
17 | 265 max = m_maxRunningThreads; |
266 if (max >= count) | |
267 break; | |
268 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
16 | 269 } |
270 | |
17 | 271 #endregion |
272 | |
273 bool StartWorker() { | |
274 if (AllocateThreadSlot()) { | |
275 // slot successfully allocated | |
276 var worker = new Thread(this.Worker); | |
277 worker.IsBackground = true; | |
80 | 278 Interlocked.Increment(ref m_activeThreads); |
17 | 279 worker.Start(); |
280 | |
281 return true; | |
282 } else { | |
283 return false; | |
284 } | |
16 | 285 } |
286 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
287 protected abstract void InvokeUnit(TUnit unit); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
288 |
41 | 289 protected virtual void Worker() { |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
290 TUnit unit; |
22 | 291 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); |
80 | 292 int count = 0;; |
293 Thread.MemoryBarrier(); | |
15 | 294 do { |
295 // exit if requested | |
80 | 296 if (m_exitRequired == 1) { |
15 | 297 // release the thread slot |
20 | 298 Interlocked.Decrement(ref m_activeThreads); |
80 | 299 if (!ReleaseThreadSlotAnyway()) // it was the last worker |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
300 SignalThread(); // wake next worker |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
301 break; |
15 | 302 } |
303 | |
304 // fetch task | |
305 if (TryDequeue(out unit)) { | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
306 InvokeUnit(unit); |
80 | 307 count ++; |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
308 continue; |
15 | 309 } |
20 | 310 Interlocked.Decrement(ref m_activeThreads); |
15 | 311 |
80 | 312 Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count); |
16 | 313 // entering suspend state |
314 // keep this thread and wait | |
20 | 315 if (!Suspend()) |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
316 break; |
80 | 317 count = 0; |
22 | 318 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
20 | 319 Interlocked.Increment(ref m_activeThreads); |
15 | 320 } while (true); |
22 | 321 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
15 | 322 } |
323 | |
324 protected virtual void Dispose(bool disposing) { | |
325 if (disposing) { | |
80 | 326 if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier |
15 | 327 // wake sleeping threads |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
328 if (m_createdThreads > 0) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
329 SignalThread(); |
15 | 330 GC.SuppressFinalize(this); |
331 } | |
332 } | |
333 } | |
334 | |
335 public void Dispose() { | |
336 Dispose(true); | |
337 } | |
338 | |
339 ~DispatchPool() { | |
340 Dispose(false); | |
341 } | |
342 } | |
343 } |