Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 22:5a35900264f5 promises
implemented nonblocking wake singnals processing
| author | cin |
|---|---|
| date | Wed, 13 Nov 2013 14:03:00 +0400 |
| parents | 6a56df4ec59e |
| children | ee04e1fa78da |
comparison
equal
deleted
inserted
replaced
| 21:6a56df4ec59e | 22:5a35900264f5 |
|---|---|
| 81 if(signals == 1) | 81 if(signals == 1) |
| 82 m_hasTasks.Set(); | 82 m_hasTasks.Set(); |
| 83 return signals; | 83 return signals; |
| 84 } | 84 } |
| 85 | 85 |
| 86 bool FetchSignalOrWait(int timeout) { | |
| 87 var start = Environment.TickCount; | |
| 88 | |
| 89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен | |
| 90 // ее вернуть, чтобы другой ожидающий поток смог | |
| 91 bool hasLock = false; | |
| 92 do { | |
| 93 int signals; | |
| 94 do { | |
| 95 signals = m_wakeEvents; | |
| 96 if (signals == 0) | |
| 97 break; | |
| 98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | |
| 99 | |
| 100 if (signals >= 1) { | |
| 101 if (signals > 1 && hasLock) | |
| 102 m_hasTasks.Set(); | |
| 103 return true; | |
| 104 } | |
| 105 | |
| 106 if (timeout != -1) | |
| 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | |
| 108 | |
| 109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | |
| 110 // и уйдет на пустой цикл, после чего заблокируется | |
| 111 | |
| 112 hasLock = true; | |
| 113 } while (m_hasTasks.WaitOne(timeout)); | |
| 114 | |
| 115 return false; | |
| 116 } | |
| 117 | |
| 86 bool Sleep(int timeout) { | 118 bool Sleep(int timeout) { |
| 87 Interlocked.Increment(ref m_sleepingThreads); | 119 Interlocked.Increment(ref m_sleepingThreads); |
| 88 if (m_hasTasks.WaitOne(timeout)) { | 120 if (FetchSignalOrWait(timeout)) { |
| 89 // this is autoreset event, only one thread can run this block simultaneously | 121 Interlocked.Decrement(ref m_sleepingThreads); |
| 90 var sleeping = Interlocked.Decrement(ref m_sleepingThreads); | |
| 91 if (Interlocked.Decrement(ref m_wakeEvents) > 0) | |
| 92 m_hasTasks.Set(); // wake next worker | |
| 93 | |
| 94 return true; | 122 return true; |
| 95 } else { | 123 } else { |
| 96 Interlocked.Decrement(ref m_sleepingThreads); | 124 Interlocked.Decrement(ref m_sleepingThreads); |
| 97 return false; | 125 return false; |
| 98 } | 126 } |
| 104 /// </summary> | 132 /// </summary> |
| 105 protected void GrowPool() { | 133 protected void GrowPool() { |
| 106 if (m_exitRequired != 0) | 134 if (m_exitRequired != 0) |
| 107 return; | 135 return; |
| 108 if (m_sleepingThreads > m_wakeEvents) { | 136 if (m_sleepingThreads > m_wakeEvents) { |
| 137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); | |
| 138 | |
| 109 // all sleeping threads may gone | 139 // all sleeping threads may gone |
| 110 SignalThread(); // wake a sleeping thread; | 140 SignalThread(); // wake a sleeping thread; |
| 111 | 141 |
| 112 // we can't check whether signal has been processed | 142 // we can't check whether signal has been processed |
| 113 // anyway it may take some time for the thread to start | 143 // anyway it may take some time for the thread to start |
| 127 | 157 |
| 128 private bool Suspend() { | 158 private bool Suspend() { |
| 129 //no tasks left, exit if the thread is no longer needed | 159 //no tasks left, exit if the thread is no longer needed |
| 130 bool last; | 160 bool last; |
| 131 bool requestExit; | 161 bool requestExit; |
| 132 | |
| 133 | |
| 134 | 162 |
| 135 // if threads have a timeout before releasing | 163 // if threads have a timeout before releasing |
| 136 if (m_releaseTimeout > 0) | 164 if (m_releaseTimeout > 0) |
| 137 requestExit = !Sleep(m_releaseTimeout); | 165 requestExit = !Sleep(m_releaseTimeout); |
| 138 else | 166 else |
| 240 | 268 |
| 241 protected abstract void InvokeUnit(TUnit unit); | 269 protected abstract void InvokeUnit(TUnit unit); |
| 242 | 270 |
| 243 void Worker() { | 271 void Worker() { |
| 244 TUnit unit; | 272 TUnit unit; |
| 273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); | |
| 245 Interlocked.Increment(ref m_activeThreads); | 274 Interlocked.Increment(ref m_activeThreads); |
| 246 Sleep(0); // remove wake request if the new thread is started | |
| 247 do { | 275 do { |
| 248 // exit if requested | 276 // exit if requested |
| 249 if (m_exitRequired != 0) { | 277 if (m_exitRequired != 0) { |
| 250 // release the thread slot | 278 // release the thread slot |
| 251 Interlocked.Decrement(ref m_activeThreads); | 279 Interlocked.Decrement(ref m_activeThreads); |
| 267 | 295 |
| 268 // entering suspend state | 296 // entering suspend state |
| 269 // keep this thread and wait | 297 // keep this thread and wait |
| 270 if (!Suspend()) | 298 if (!Suspend()) |
| 271 break; | 299 break; |
| 272 | 300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
| 273 Interlocked.Increment(ref m_activeThreads); | 301 Interlocked.Increment(ref m_activeThreads); |
| 274 } while (true); | 302 } while (true); |
| 275 | 303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
| 276 } | 304 } |
| 277 | 305 |
| 278 protected virtual void Dispose(bool disposing) { | 306 protected virtual void Dispose(bool disposing) { |
| 279 if (disposing) { | 307 if (disposing) { |
| 280 if (m_exitRequired == 0) { | 308 if (m_exitRequired == 0) { |
