Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 24:ee04e1fa78da
fixed dispatch pool race condition
| author | cin |
|---|---|
| date | Thu, 14 Nov 2013 01:15:07 +0400 |
| parents | 5a35900264f5 |
| children | 2fad2d1f4b03 |
| rev | line source |
|---|---|
| 15 | 1 using System; |
| 2 using System.Collections.Generic; | |
| 3 using System.Linq; | |
| 4 using System.Text; | |
| 5 using System.Threading; | |
| 6 using System.Diagnostics; | |
| 7 | |
| 8 namespace Implab.Parallels { | |
| 9 public abstract class DispatchPool<TUnit> : IDisposable { | |
| 10 readonly int m_minThreads; | |
| 11 readonly int m_maxThreads; | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
12 |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
13 int m_createdThreads = 0; // the current size of the pool |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
14 int m_activeThreads = 0; // the count of threads which are active |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
15 int m_sleepingThreads = 0; // the count of currently inactive threads |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
19 int m_wakeEvents = 0; // the count of wake events |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
20 |
| 15 | 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false); |
| 22 | |
| 23 protected DispatchPool(int min, int max) { | |
| 24 if (min < 0) | |
| 25 throw new ArgumentOutOfRangeException("min"); | |
| 26 if (max <= 0) | |
| 27 throw new ArgumentOutOfRangeException("max"); | |
| 28 | |
| 29 if (min > max) | |
| 30 min = max; | |
| 31 m_minThreads = min; | |
| 32 m_maxThreads = max; | |
| 33 } | |
| 34 | |
| 35 protected DispatchPool(int threads) | |
| 36 : this(threads, threads) { | |
| 37 } | |
| 38 | |
| 39 protected DispatchPool() { | |
| 40 int maxThreads, maxCP; | |
| 41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
| 42 | |
| 43 m_minThreads = 0; | |
| 44 m_maxThreads = maxThreads; | |
| 45 } | |
| 46 | |
| 47 protected void InitPool() { | |
| 48 for (int i = 0; i < m_minThreads; i++) | |
| 49 StartWorker(); | |
| 50 } | |
| 51 | |
| 20 | 52 public int PoolSize { |
| 15 | 53 get { |
| 20 | 54 return m_createdThreads; |
| 55 } | |
| 56 } | |
| 57 | |
| 58 public int ActiveThreads { | |
| 59 get { | |
| 60 return m_activeThreads; | |
| 15 | 61 } |
| 62 } | |
| 63 | |
| 64 public int MaxRunningThreads { | |
| 65 get { | |
| 66 return m_maxRunningThreads; | |
| 67 } | |
| 68 } | |
| 69 | |
| 70 protected bool IsDisposed { | |
| 71 get { | |
| 72 return m_exitRequired != 0; | |
| 73 } | |
| 74 } | |
| 75 | |
| 17 | 76 protected abstract bool TryDequeue(out TUnit unit); |
| 77 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
78 #region thread execution traits |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
79 int SignalThread() { |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
80 var signals = Interlocked.Increment(ref m_wakeEvents); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
81 if(signals == 1) |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
82 m_hasTasks.Set(); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
83 return signals; |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
84 } |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
85 |
| 22 | 86 bool FetchSignalOrWait(int timeout) { |
| 87 var start = Environment.TickCount; | |
| 88 | |
| 89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен | |
| 90 // ее вернуть, чтобы другой ожидающий поток смог | |
| 91 bool hasLock = false; | |
| 92 do { | |
| 93 int signals; | |
| 94 do { | |
| 95 signals = m_wakeEvents; | |
| 96 if (signals == 0) | |
| 97 break; | |
| 98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | |
| 99 | |
| 100 if (signals >= 1) { | |
| 101 if (signals > 1 && hasLock) | |
| 102 m_hasTasks.Set(); | |
| 103 return true; | |
| 104 } | |
| 105 | |
| 106 if (timeout != -1) | |
| 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | |
| 108 | |
| 109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | |
| 110 // и уйдет на пустой цикл, после чего заблокируется | |
| 111 | |
| 112 hasLock = true; | |
| 113 } while (m_hasTasks.WaitOne(timeout)); | |
| 114 | |
| 115 return false; | |
| 116 } | |
| 117 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
118 bool Sleep(int timeout) { |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
119 Interlocked.Increment(ref m_sleepingThreads); |
| 22 | 120 if (FetchSignalOrWait(timeout)) { |
| 121 Interlocked.Decrement(ref m_sleepingThreads); | |
| 17 | 122 return true; |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
123 } else { |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
124 Interlocked.Decrement(ref m_sleepingThreads); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
125 return false; |
| 20 | 126 } |
| 17 | 127 } |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
128 #endregion |
| 17 | 129 |
| 130 /// <summary> | |
| 131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |
| 132 /// </summary> | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
133 protected void GrowPool() { |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
134 if (m_exitRequired != 0) |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
135 return; |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
136 if (m_sleepingThreads > m_wakeEvents) { |
| 22 | 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); |
| 138 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
139 // all sleeping threads may gone |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
140 SignalThread(); // wake a sleeping thread; |
| 17 | 141 |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
142 // we can't check whether signal has been processed |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
143 // anyway it may take some time for the thread to start |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
144 // we will ensure that at least one thread is running |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
145 |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
146 if (AllocateThreadSlot(1)) { |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
147 // if there were no threads in the pool |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
148 var worker = new Thread(this.Worker); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
149 worker.IsBackground = true; |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
150 worker.Start(); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
151 } |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
152 } else { |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
153 // if there is no sleeping threads in the pool |
| 24 | 154 if (!StartWorker()) |
| 155 // we haven't started a new thread, but the current can be on the way and it can't process the queue | |
| 156 // send it a signal to spin again | |
| 157 SignalThread(); | |
| 17 | 158 } |
| 159 } | |
| 160 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
161 private bool Suspend() { |
| 20 | 162 //no tasks left, exit if the thread is no longer needed |
| 163 bool last; | |
| 164 bool requestExit; | |
| 165 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
166 // if threads have a timeout before releasing |
| 20 | 167 if (m_releaseTimeout > 0) |
| 168 requestExit = !Sleep(m_releaseTimeout); | |
| 169 else | |
| 170 requestExit = true; | |
| 171 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
172 if (!requestExit) |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
173 return true; |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
174 |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
175 // release unsused thread |
| 20 | 176 if (requestExit && ReleaseThreadSlot(out last)) { |
| 177 // in case at the moment the last thread was being released | |
| 178 // a new task was added to the queue, we need to try | |
| 179 // to revoke the thread to avoid the situation when the task is left unprocessed | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
180 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false |
| 20 | 181 if (AllocateThreadSlot(1)) |
| 182 return true; // spin again... | |
| 183 else | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
184 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
185 |
| 20 | 186 } |
| 187 | |
| 188 return false; | |
| 189 } | |
| 190 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
191 // wait till infinity |
| 20 | 192 Sleep(-1); |
| 193 | |
| 194 return true; | |
| 17 | 195 } |
| 196 | |
| 197 #region thread slots traits | |
| 198 | |
| 199 bool AllocateThreadSlot() { | |
| 16 | 200 int current; |
| 15 | 201 // use spins to allocate slot for the new thread |
| 202 do { | |
| 20 | 203 current = m_createdThreads; |
| 15 | 204 if (current >= m_maxThreads || m_exitRequired != 0) |
| 205 // no more slots left or the pool has been disposed | |
| 206 return false; | |
| 20 | 207 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
| 15 | 208 |
| 17 | 209 UpdateMaxThreads(current + 1); |
| 210 | |
| 211 return true; | |
| 212 } | |
| 15 | 213 |
| 17 | 214 bool AllocateThreadSlot(int desired) { |
| 20 | 215 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) |
| 17 | 216 return false; |
| 217 | |
| 218 UpdateMaxThreads(desired); | |
| 15 | 219 |
| 17 | 220 return true; |
| 221 } | |
| 222 | |
| 223 bool ReleaseThreadSlot(out bool last) { | |
| 224 last = false; | |
| 225 int current; | |
| 226 // use spins to release slot for the new thread | |
| 227 do { | |
| 20 | 228 current = m_createdThreads; |
| 17 | 229 if (current <= m_minThreads && m_exitRequired == 0) |
| 230 // the thread is reserved | |
| 231 return false; | |
| 20 | 232 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); |
| 17 | 233 |
| 234 last = (current == 1); | |
| 15 | 235 |
| 236 return true; | |
| 237 } | |
| 238 | |
| 17 | 239 /// <summary> |
| 240 /// releases thread slot unconditionally, used during cleanup | |
| 241 /// </summary> | |
| 242 /// <returns>true - no more threads left</returns> | |
| 243 bool ReleaseThreadSlotAnyway() { | |
| 20 | 244 var left = Interlocked.Decrement(ref m_createdThreads); |
| 17 | 245 return left == 0; |
| 15 | 246 } |
| 247 | |
| 17 | 248 void UpdateMaxThreads(int count) { |
| 249 int max; | |
| 16 | 250 do { |
| 17 | 251 max = m_maxRunningThreads; |
| 252 if (max >= count) | |
| 253 break; | |
| 254 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
| 16 | 255 } |
| 256 | |
| 17 | 257 #endregion |
| 258 | |
| 259 bool StartWorker() { | |
| 260 if (AllocateThreadSlot()) { | |
| 261 // slot successfully allocated | |
| 262 var worker = new Thread(this.Worker); | |
| 263 worker.IsBackground = true; | |
| 264 worker.Start(); | |
| 265 | |
| 266 return true; | |
| 267 } else { | |
| 268 return false; | |
| 269 } | |
| 16 | 270 } |
| 271 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
272 protected abstract void InvokeUnit(TUnit unit); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
273 |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
274 void Worker() { |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
275 TUnit unit; |
| 22 | 276 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
277 Interlocked.Increment(ref m_activeThreads); |
| 15 | 278 do { |
| 279 // exit if requested | |
| 280 if (m_exitRequired != 0) { | |
| 281 // release the thread slot | |
| 20 | 282 Interlocked.Decrement(ref m_activeThreads); |
| 17 | 283 if (ReleaseThreadSlotAnyway()) // it was the last worker |
| 15 | 284 m_hasTasks.Dispose(); |
| 285 else | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
286 SignalThread(); // wake next worker |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
287 break; |
| 15 | 288 } |
| 289 | |
| 290 // fetch task | |
| 291 if (TryDequeue(out unit)) { | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
292 InvokeUnit(unit); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
293 continue; |
| 15 | 294 } |
| 295 | |
| 20 | 296 Interlocked.Decrement(ref m_activeThreads); |
| 15 | 297 |
| 16 | 298 // entering suspend state |
| 299 // keep this thread and wait | |
| 20 | 300 if (!Suspend()) |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
301 break; |
| 22 | 302 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
| 20 | 303 Interlocked.Increment(ref m_activeThreads); |
| 15 | 304 } while (true); |
| 22 | 305 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
| 15 | 306 } |
| 307 | |
| 308 protected virtual void Dispose(bool disposing) { | |
| 309 if (disposing) { | |
| 310 if (m_exitRequired == 0) { | |
| 311 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | |
| 312 return; | |
| 313 | |
| 314 // wake sleeping threads | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
315 if (m_createdThreads > 0) |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
316 SignalThread(); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
317 else |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
318 m_hasTasks.Dispose(); |
| 15 | 319 GC.SuppressFinalize(this); |
| 320 } | |
| 321 } | |
| 322 } | |
| 323 | |
| 324 public void Dispose() { | |
| 325 Dispose(true); | |
| 326 } | |
| 327 | |
| 328 ~DispatchPool() { | |
| 329 Dispose(false); | |
| 330 } | |
| 331 } | |
| 332 } |
