Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 22:5a35900264f5 promises
implemented nonblocking wake singnals processing
author | cin |
---|---|
date | Wed, 13 Nov 2013 14:03:00 +0400 |
parents | 6a56df4ec59e |
children | ee04e1fa78da |
comparison
equal
deleted
inserted
replaced
21:6a56df4ec59e | 22:5a35900264f5 |
---|---|
81 if(signals == 1) | 81 if(signals == 1) |
82 m_hasTasks.Set(); | 82 m_hasTasks.Set(); |
83 return signals; | 83 return signals; |
84 } | 84 } |
85 | 85 |
86 bool FetchSignalOrWait(int timeout) { | |
87 var start = Environment.TickCount; | |
88 | |
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен | |
90 // ее вернуть, чтобы другой ожидающий поток смог | |
91 bool hasLock = false; | |
92 do { | |
93 int signals; | |
94 do { | |
95 signals = m_wakeEvents; | |
96 if (signals == 0) | |
97 break; | |
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | |
99 | |
100 if (signals >= 1) { | |
101 if (signals > 1 && hasLock) | |
102 m_hasTasks.Set(); | |
103 return true; | |
104 } | |
105 | |
106 if (timeout != -1) | |
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | |
108 | |
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | |
110 // и уйдет на пустой цикл, после чего заблокируется | |
111 | |
112 hasLock = true; | |
113 } while (m_hasTasks.WaitOne(timeout)); | |
114 | |
115 return false; | |
116 } | |
117 | |
86 bool Sleep(int timeout) { | 118 bool Sleep(int timeout) { |
87 Interlocked.Increment(ref m_sleepingThreads); | 119 Interlocked.Increment(ref m_sleepingThreads); |
88 if (m_hasTasks.WaitOne(timeout)) { | 120 if (FetchSignalOrWait(timeout)) { |
89 // this is autoreset event, only one thread can run this block simultaneously | 121 Interlocked.Decrement(ref m_sleepingThreads); |
90 var sleeping = Interlocked.Decrement(ref m_sleepingThreads); | |
91 if (Interlocked.Decrement(ref m_wakeEvents) > 0) | |
92 m_hasTasks.Set(); // wake next worker | |
93 | |
94 return true; | 122 return true; |
95 } else { | 123 } else { |
96 Interlocked.Decrement(ref m_sleepingThreads); | 124 Interlocked.Decrement(ref m_sleepingThreads); |
97 return false; | 125 return false; |
98 } | 126 } |
104 /// </summary> | 132 /// </summary> |
105 protected void GrowPool() { | 133 protected void GrowPool() { |
106 if (m_exitRequired != 0) | 134 if (m_exitRequired != 0) |
107 return; | 135 return; |
108 if (m_sleepingThreads > m_wakeEvents) { | 136 if (m_sleepingThreads > m_wakeEvents) { |
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); | |
138 | |
109 // all sleeping threads may gone | 139 // all sleeping threads may gone |
110 SignalThread(); // wake a sleeping thread; | 140 SignalThread(); // wake a sleeping thread; |
111 | 141 |
112 // we can't check whether signal has been processed | 142 // we can't check whether signal has been processed |
113 // anyway it may take some time for the thread to start | 143 // anyway it may take some time for the thread to start |
127 | 157 |
128 private bool Suspend() { | 158 private bool Suspend() { |
129 //no tasks left, exit if the thread is no longer needed | 159 //no tasks left, exit if the thread is no longer needed |
130 bool last; | 160 bool last; |
131 bool requestExit; | 161 bool requestExit; |
132 | |
133 | |
134 | 162 |
135 // if threads have a timeout before releasing | 163 // if threads have a timeout before releasing |
136 if (m_releaseTimeout > 0) | 164 if (m_releaseTimeout > 0) |
137 requestExit = !Sleep(m_releaseTimeout); | 165 requestExit = !Sleep(m_releaseTimeout); |
138 else | 166 else |
240 | 268 |
241 protected abstract void InvokeUnit(TUnit unit); | 269 protected abstract void InvokeUnit(TUnit unit); |
242 | 270 |
243 void Worker() { | 271 void Worker() { |
244 TUnit unit; | 272 TUnit unit; |
273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); | |
245 Interlocked.Increment(ref m_activeThreads); | 274 Interlocked.Increment(ref m_activeThreads); |
246 Sleep(0); // remove wake request if the new thread is started | |
247 do { | 275 do { |
248 // exit if requested | 276 // exit if requested |
249 if (m_exitRequired != 0) { | 277 if (m_exitRequired != 0) { |
250 // release the thread slot | 278 // release the thread slot |
251 Interlocked.Decrement(ref m_activeThreads); | 279 Interlocked.Decrement(ref m_activeThreads); |
267 | 295 |
268 // entering suspend state | 296 // entering suspend state |
269 // keep this thread and wait | 297 // keep this thread and wait |
270 if (!Suspend()) | 298 if (!Suspend()) |
271 break; | 299 break; |
272 | 300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
273 Interlocked.Increment(ref m_activeThreads); | 301 Interlocked.Increment(ref m_activeThreads); |
274 } while (true); | 302 } while (true); |
275 | 303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
276 } | 304 } |
277 | 305 |
278 protected virtual void Dispose(bool disposing) { | 306 protected virtual void Dispose(bool disposing) { |
279 if (disposing) { | 307 if (disposing) { |
280 if (m_exitRequired == 0) { | 308 if (m_exitRequired == 0) { |