Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 80:4f20870d0816 v2
added memory barriers
| author | cin |
|---|---|
| date | Fri, 26 Sep 2014 03:32:34 +0400 |
| parents | 2fc0fbe7d58b |
| children | 2c5631b43c7d |
comparison
equal
deleted
inserted
replaced
| 79:05e6468f066f | 80:4f20870d0816 |
|---|---|
| 7 | 7 |
| 8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
| 9 public abstract class DispatchPool<TUnit> : IDisposable { | 9 public abstract class DispatchPool<TUnit> : IDisposable { |
| 10 readonly int m_minThreads; | 10 readonly int m_minThreads; |
| 11 readonly int m_maxThreads; | 11 readonly int m_maxThreads; |
| 12 readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit | |
| 12 | 13 |
| 13 int m_createdThreads = 0; // the current size of the pool | 14 int m_createdThreads = 0; // the current size of the pool |
| 14 int m_activeThreads = 0; // the count of threads which are active | 15 int m_activeThreads = 0; // the count of threads which are active |
| 15 int m_sleepingThreads = 0; // the count of currently inactive threads | 16 int m_sleepingThreads = 0; // the count of currently inactive threads |
| 16 int m_maxRunningThreads = 0; // the meximum reached size of the pool | 17 int m_maxRunningThreads = 0; // the meximum reached size of the pool |
| 17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released | 18 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released |
| 18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit | 19 |
| 19 int m_wakeEvents = 0; // the count of wake events | 20 int m_wakeEvents = 0; // the count of wake events |
| 20 | 21 |
| 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false); | 22 readonly object m_signalLocker = new object(); |
| 22 | 23 |
| 23 protected DispatchPool(int min, int max) { | 24 protected DispatchPool(int min, int max) { |
| 24 if (min < 0) | 25 if (min < 0) |
| 25 throw new ArgumentOutOfRangeException("min"); | 26 throw new ArgumentOutOfRangeException("min"); |
| 26 if (max <= 0) | 27 if (max <= 0) |
| 49 StartWorker(); | 50 StartWorker(); |
| 50 } | 51 } |
| 51 | 52 |
| 52 public int PoolSize { | 53 public int PoolSize { |
| 53 get { | 54 get { |
| 55 Thread.MemoryBarrier(); | |
| 54 return m_createdThreads; | 56 return m_createdThreads; |
| 55 } | 57 } |
| 56 } | 58 } |
| 57 | 59 |
| 58 public int ActiveThreads { | 60 public int ActiveThreads { |
| 59 get { | 61 get { |
| 62 Thread.MemoryBarrier(); | |
| 60 return m_activeThreads; | 63 return m_activeThreads; |
| 61 } | 64 } |
| 62 } | 65 } |
| 63 | 66 |
| 64 public int MaxRunningThreads { | 67 public int MaxRunningThreads { |
| 65 get { | 68 get { |
| 69 Thread.MemoryBarrier(); | |
| 66 return m_maxRunningThreads; | 70 return m_maxRunningThreads; |
| 67 } | 71 } |
| 68 } | 72 } |
| 69 | 73 |
| 70 protected bool IsDisposed { | 74 protected bool IsDisposed { |
| 71 get { | 75 get { |
| 72 return m_exitRequired != 0; | 76 Thread.MemoryBarrier(); |
| 77 return m_exitRequired == 1; | |
| 73 } | 78 } |
| 74 } | 79 } |
| 75 | 80 |
| 76 protected abstract bool TryDequeue(out TUnit unit); | 81 protected abstract bool TryDequeue(out TUnit unit); |
| 77 | 82 |
| 78 #region thread execution traits | 83 #region thread signaling traits |
| 79 int SignalThread() { | 84 int SignalThread() { |
| 80 var signals = Interlocked.Increment(ref m_wakeEvents); | 85 var signals = Interlocked.Increment(ref m_wakeEvents); |
| 81 if(signals == 1) | 86 if(signals == 1) |
| 82 m_hasTasks.Set(); | 87 lock(m_signalLocker) |
| 88 Monitor.Pulse(m_signalLocker); | |
| 83 return signals; | 89 return signals; |
| 84 } | 90 } |
| 85 | 91 |
| 86 bool FetchSignalOrWait(int timeout) { | 92 bool FetchSignalOrWait(int timeout) { |
| 87 var start = Environment.TickCount; | 93 var start = Environment.TickCount; |
| 88 | 94 int signals; |
| 89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен | 95 Thread.MemoryBarrier(); // m_wakeEvents volatile first read |
| 90 // ее вернуть, чтобы другой ожидающий поток смог | 96 do { |
| 91 bool hasLock = false; | 97 signals = m_wakeEvents; |
| 92 do { | 98 if (signals == 0) |
| 93 int signals; | 99 break; |
| 94 do { | 100 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); |
| 95 signals = m_wakeEvents; | 101 |
| 96 if (signals == 0) | 102 if (signals == 0) { |
| 97 break; | 103 // no signal is fetched |
| 98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | 104 lock(m_signalLocker) { |
| 99 | 105 while(m_wakeEvents == 0) { |
| 100 if (signals >= 1) { | 106 if (timeout != -1) |
| 101 if (signals > 1 && hasLock) | 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); |
| 102 m_hasTasks.Set(); | 108 if(!Monitor.Wait(m_signalLocker,timeout)) |
| 109 return false; // timeout | |
| 110 } | |
| 111 // m_wakeEvents > 0 | |
| 112 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized | |
| 113 Monitor.Pulse(m_signalLocker); | |
| 114 | |
| 115 // signal fetched | |
| 103 return true; | 116 return true; |
| 104 } | 117 } |
| 105 | 118 |
| 106 if (timeout != -1) | 119 } else { |
| 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | 120 // signal fetched |
| 108 | 121 return true; |
| 109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | 122 } |
| 110 // и уйдет на пустой цикл, после чего заблокируется | 123 |
| 111 | 124 |
| 112 hasLock = true; | |
| 113 } while (m_hasTasks.WaitOne(timeout)); | |
| 114 | |
| 115 return false; | |
| 116 } | 125 } |
| 117 | 126 |
| 118 bool Sleep(int timeout) { | 127 bool Sleep(int timeout) { |
| 119 Interlocked.Increment(ref m_sleepingThreads); | 128 Interlocked.Increment(ref m_sleepingThreads); |
| 120 if (FetchSignalOrWait(timeout)) { | 129 if (FetchSignalOrWait(timeout)) { |
| 129 | 138 |
| 130 /// <summary> | 139 /// <summary> |
| 131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | 140 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока |
| 132 /// </summary> | 141 /// </summary> |
| 133 protected void GrowPool() { | 142 protected void GrowPool() { |
| 134 if (m_exitRequired != 0) | 143 Thread.MemoryBarrier(); |
| 144 if (m_exitRequired == 1) | |
| 135 return; | 145 return; |
| 136 if (m_sleepingThreads > m_wakeEvents) { | 146 if (m_sleepingThreads > m_wakeEvents) { |
| 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); | 147 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); |
| 138 | 148 |
| 139 // all sleeping threads may gone | 149 // all sleeping threads may gone |
| 202 bool AllocateThreadSlot() { | 212 bool AllocateThreadSlot() { |
| 203 int current; | 213 int current; |
| 204 // use spins to allocate slot for the new thread | 214 // use spins to allocate slot for the new thread |
| 205 do { | 215 do { |
| 206 current = m_createdThreads; | 216 current = m_createdThreads; |
| 207 if (current >= m_maxThreads || m_exitRequired != 0) | 217 if (current >= m_maxThreads || m_exitRequired == 1) |
| 208 // no more slots left or the pool has been disposed | 218 // no more slots left or the pool has been disposed |
| 209 return false; | 219 return false; |
| 210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); | 220 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
| 211 | 221 |
| 212 UpdateMaxThreads(current + 1); | 222 UpdateMaxThreads(current + 1); |
| 225 | 235 |
| 226 bool ReleaseThreadSlot(out bool last) { | 236 bool ReleaseThreadSlot(out bool last) { |
| 227 last = false; | 237 last = false; |
| 228 int current; | 238 int current; |
| 229 // use spins to release slot for the new thread | 239 // use spins to release slot for the new thread |
| 240 Thread.MemoryBarrier(); | |
| 230 do { | 241 do { |
| 231 current = m_createdThreads; | 242 current = m_createdThreads; |
| 232 if (current <= m_minThreads && m_exitRequired == 0) | 243 if (current <= m_minThreads && m_exitRequired == 0) |
| 233 // the thread is reserved | 244 // the thread is reserved |
| 234 return false; | 245 return false; |
| 262 bool StartWorker() { | 273 bool StartWorker() { |
| 263 if (AllocateThreadSlot()) { | 274 if (AllocateThreadSlot()) { |
| 264 // slot successfully allocated | 275 // slot successfully allocated |
| 265 var worker = new Thread(this.Worker); | 276 var worker = new Thread(this.Worker); |
| 266 worker.IsBackground = true; | 277 worker.IsBackground = true; |
| 278 Interlocked.Increment(ref m_activeThreads); | |
| 267 worker.Start(); | 279 worker.Start(); |
| 268 | 280 |
| 269 return true; | 281 return true; |
| 270 } else { | 282 } else { |
| 271 return false; | 283 return false; |
| 275 protected abstract void InvokeUnit(TUnit unit); | 287 protected abstract void InvokeUnit(TUnit unit); |
| 276 | 288 |
| 277 protected virtual void Worker() { | 289 protected virtual void Worker() { |
| 278 TUnit unit; | 290 TUnit unit; |
| 279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); | 291 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); |
| 280 Interlocked.Increment(ref m_activeThreads); | 292 int count = 0;; |
| 293 Thread.MemoryBarrier(); | |
| 281 do { | 294 do { |
| 282 // exit if requested | 295 // exit if requested |
| 283 if (m_exitRequired != 0) { | 296 if (m_exitRequired == 1) { |
| 284 // release the thread slot | 297 // release the thread slot |
| 285 Interlocked.Decrement(ref m_activeThreads); | 298 Interlocked.Decrement(ref m_activeThreads); |
| 286 if (ReleaseThreadSlotAnyway()) // it was the last worker | 299 if (!ReleaseThreadSlotAnyway()) // it was the last worker |
| 287 m_hasTasks.Dispose(); | |
| 288 else | |
| 289 SignalThread(); // wake next worker | 300 SignalThread(); // wake next worker |
| 290 break; | 301 break; |
| 291 } | 302 } |
| 292 | 303 |
| 293 // fetch task | 304 // fetch task |
| 294 if (TryDequeue(out unit)) { | 305 if (TryDequeue(out unit)) { |
| 295 InvokeUnit(unit); | 306 InvokeUnit(unit); |
| 307 count ++; | |
| 296 continue; | 308 continue; |
| 297 } | 309 } |
| 298 Interlocked.Decrement(ref m_activeThreads); | 310 Interlocked.Decrement(ref m_activeThreads); |
| 299 | 311 |
| 312 Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count); | |
| 300 // entering suspend state | 313 // entering suspend state |
| 301 // keep this thread and wait | 314 // keep this thread and wait |
| 302 if (!Suspend()) | 315 if (!Suspend()) |
| 303 break; | 316 break; |
| 317 count = 0; | |
| 304 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); | 318 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
| 305 Interlocked.Increment(ref m_activeThreads); | 319 Interlocked.Increment(ref m_activeThreads); |
| 306 } while (true); | 320 } while (true); |
| 307 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); | 321 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
| 308 } | 322 } |
| 309 | 323 |
| 310 protected virtual void Dispose(bool disposing) { | 324 protected virtual void Dispose(bool disposing) { |
| 311 if (disposing) { | 325 if (disposing) { |
| 312 if (m_exitRequired == 0) { | 326 if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier |
| 313 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | |
| 314 return; | |
| 315 | |
| 316 // wake sleeping threads | 327 // wake sleeping threads |
| 317 if (m_createdThreads > 0) | 328 if (m_createdThreads > 0) |
| 318 SignalThread(); | 329 SignalThread(); |
| 319 else | |
| 320 m_hasTasks.Dispose(); | |
| 321 GC.SuppressFinalize(this); | 330 GC.SuppressFinalize(this); |
| 322 } | 331 } |
| 323 } | 332 } |
| 324 } | 333 } |
| 325 | 334 |
