Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | 2fc0fbe7d58b |
children | 2c5631b43c7d |
comparison
equal
deleted
inserted
replaced
79:05e6468f066f | 80:4f20870d0816 |
---|---|
7 | 7 |
8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
9 public abstract class DispatchPool<TUnit> : IDisposable { | 9 public abstract class DispatchPool<TUnit> : IDisposable { |
10 readonly int m_minThreads; | 10 readonly int m_minThreads; |
11 readonly int m_maxThreads; | 11 readonly int m_maxThreads; |
12 readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit | |
12 | 13 |
13 int m_createdThreads = 0; // the current size of the pool | 14 int m_createdThreads = 0; // the current size of the pool |
14 int m_activeThreads = 0; // the count of threads which are active | 15 int m_activeThreads = 0; // the count of threads which are active |
15 int m_sleepingThreads = 0; // the count of currently inactive threads | 16 int m_sleepingThreads = 0; // the count of currently inactive threads |
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool | 17 int m_maxRunningThreads = 0; // the meximum reached size of the pool |
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released | 18 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released |
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit | 19 |
19 int m_wakeEvents = 0; // the count of wake events | 20 int m_wakeEvents = 0; // the count of wake events |
20 | 21 |
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false); | 22 readonly object m_signalLocker = new object(); |
22 | 23 |
23 protected DispatchPool(int min, int max) { | 24 protected DispatchPool(int min, int max) { |
24 if (min < 0) | 25 if (min < 0) |
25 throw new ArgumentOutOfRangeException("min"); | 26 throw new ArgumentOutOfRangeException("min"); |
26 if (max <= 0) | 27 if (max <= 0) |
49 StartWorker(); | 50 StartWorker(); |
50 } | 51 } |
51 | 52 |
52 public int PoolSize { | 53 public int PoolSize { |
53 get { | 54 get { |
55 Thread.MemoryBarrier(); | |
54 return m_createdThreads; | 56 return m_createdThreads; |
55 } | 57 } |
56 } | 58 } |
57 | 59 |
58 public int ActiveThreads { | 60 public int ActiveThreads { |
59 get { | 61 get { |
62 Thread.MemoryBarrier(); | |
60 return m_activeThreads; | 63 return m_activeThreads; |
61 } | 64 } |
62 } | 65 } |
63 | 66 |
64 public int MaxRunningThreads { | 67 public int MaxRunningThreads { |
65 get { | 68 get { |
69 Thread.MemoryBarrier(); | |
66 return m_maxRunningThreads; | 70 return m_maxRunningThreads; |
67 } | 71 } |
68 } | 72 } |
69 | 73 |
70 protected bool IsDisposed { | 74 protected bool IsDisposed { |
71 get { | 75 get { |
72 return m_exitRequired != 0; | 76 Thread.MemoryBarrier(); |
77 return m_exitRequired == 1; | |
73 } | 78 } |
74 } | 79 } |
75 | 80 |
76 protected abstract bool TryDequeue(out TUnit unit); | 81 protected abstract bool TryDequeue(out TUnit unit); |
77 | 82 |
78 #region thread execution traits | 83 #region thread signaling traits |
79 int SignalThread() { | 84 int SignalThread() { |
80 var signals = Interlocked.Increment(ref m_wakeEvents); | 85 var signals = Interlocked.Increment(ref m_wakeEvents); |
81 if(signals == 1) | 86 if(signals == 1) |
82 m_hasTasks.Set(); | 87 lock(m_signalLocker) |
88 Monitor.Pulse(m_signalLocker); | |
83 return signals; | 89 return signals; |
84 } | 90 } |
85 | 91 |
86 bool FetchSignalOrWait(int timeout) { | 92 bool FetchSignalOrWait(int timeout) { |
87 var start = Environment.TickCount; | 93 var start = Environment.TickCount; |
88 | 94 int signals; |
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен | 95 Thread.MemoryBarrier(); // m_wakeEvents volatile first read |
90 // ее вернуть, чтобы другой ожидающий поток смог | 96 do { |
91 bool hasLock = false; | 97 signals = m_wakeEvents; |
92 do { | 98 if (signals == 0) |
93 int signals; | 99 break; |
94 do { | 100 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); |
95 signals = m_wakeEvents; | 101 |
96 if (signals == 0) | 102 if (signals == 0) { |
97 break; | 103 // no signal is fetched |
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); | 104 lock(m_signalLocker) { |
99 | 105 while(m_wakeEvents == 0) { |
100 if (signals >= 1) { | 106 if (timeout != -1) |
101 if (signals > 1 && hasLock) | 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); |
102 m_hasTasks.Set(); | 108 if(!Monitor.Wait(m_signalLocker,timeout)) |
109 return false; // timeout | |
110 } | |
111 // m_wakeEvents > 0 | |
112 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized | |
113 Monitor.Pulse(m_signalLocker); | |
114 | |
115 // signal fetched | |
103 return true; | 116 return true; |
104 } | 117 } |
105 | 118 |
106 if (timeout != -1) | 119 } else { |
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); | 120 // signal fetched |
108 | 121 return true; |
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие | 122 } |
110 // и уйдет на пустой цикл, после чего заблокируется | 123 |
111 | 124 |
112 hasLock = true; | |
113 } while (m_hasTasks.WaitOne(timeout)); | |
114 | |
115 return false; | |
116 } | 125 } |
117 | 126 |
118 bool Sleep(int timeout) { | 127 bool Sleep(int timeout) { |
119 Interlocked.Increment(ref m_sleepingThreads); | 128 Interlocked.Increment(ref m_sleepingThreads); |
120 if (FetchSignalOrWait(timeout)) { | 129 if (FetchSignalOrWait(timeout)) { |
129 | 138 |
130 /// <summary> | 139 /// <summary> |
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | 140 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока |
132 /// </summary> | 141 /// </summary> |
133 protected void GrowPool() { | 142 protected void GrowPool() { |
134 if (m_exitRequired != 0) | 143 Thread.MemoryBarrier(); |
144 if (m_exitRequired == 1) | |
135 return; | 145 return; |
136 if (m_sleepingThreads > m_wakeEvents) { | 146 if (m_sleepingThreads > m_wakeEvents) { |
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); | 147 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); |
138 | 148 |
139 // all sleeping threads may gone | 149 // all sleeping threads may gone |
202 bool AllocateThreadSlot() { | 212 bool AllocateThreadSlot() { |
203 int current; | 213 int current; |
204 // use spins to allocate slot for the new thread | 214 // use spins to allocate slot for the new thread |
205 do { | 215 do { |
206 current = m_createdThreads; | 216 current = m_createdThreads; |
207 if (current >= m_maxThreads || m_exitRequired != 0) | 217 if (current >= m_maxThreads || m_exitRequired == 1) |
208 // no more slots left or the pool has been disposed | 218 // no more slots left or the pool has been disposed |
209 return false; | 219 return false; |
210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); | 220 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
211 | 221 |
212 UpdateMaxThreads(current + 1); | 222 UpdateMaxThreads(current + 1); |
225 | 235 |
226 bool ReleaseThreadSlot(out bool last) { | 236 bool ReleaseThreadSlot(out bool last) { |
227 last = false; | 237 last = false; |
228 int current; | 238 int current; |
229 // use spins to release slot for the new thread | 239 // use spins to release slot for the new thread |
240 Thread.MemoryBarrier(); | |
230 do { | 241 do { |
231 current = m_createdThreads; | 242 current = m_createdThreads; |
232 if (current <= m_minThreads && m_exitRequired == 0) | 243 if (current <= m_minThreads && m_exitRequired == 0) |
233 // the thread is reserved | 244 // the thread is reserved |
234 return false; | 245 return false; |
262 bool StartWorker() { | 273 bool StartWorker() { |
263 if (AllocateThreadSlot()) { | 274 if (AllocateThreadSlot()) { |
264 // slot successfully allocated | 275 // slot successfully allocated |
265 var worker = new Thread(this.Worker); | 276 var worker = new Thread(this.Worker); |
266 worker.IsBackground = true; | 277 worker.IsBackground = true; |
278 Interlocked.Increment(ref m_activeThreads); | |
267 worker.Start(); | 279 worker.Start(); |
268 | 280 |
269 return true; | 281 return true; |
270 } else { | 282 } else { |
271 return false; | 283 return false; |
275 protected abstract void InvokeUnit(TUnit unit); | 287 protected abstract void InvokeUnit(TUnit unit); |
276 | 288 |
277 protected virtual void Worker() { | 289 protected virtual void Worker() { |
278 TUnit unit; | 290 TUnit unit; |
279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); | 291 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); |
280 Interlocked.Increment(ref m_activeThreads); | 292 int count = 0;; |
293 Thread.MemoryBarrier(); | |
281 do { | 294 do { |
282 // exit if requested | 295 // exit if requested |
283 if (m_exitRequired != 0) { | 296 if (m_exitRequired == 1) { |
284 // release the thread slot | 297 // release the thread slot |
285 Interlocked.Decrement(ref m_activeThreads); | 298 Interlocked.Decrement(ref m_activeThreads); |
286 if (ReleaseThreadSlotAnyway()) // it was the last worker | 299 if (!ReleaseThreadSlotAnyway()) // it was the last worker |
287 m_hasTasks.Dispose(); | |
288 else | |
289 SignalThread(); // wake next worker | 300 SignalThread(); // wake next worker |
290 break; | 301 break; |
291 } | 302 } |
292 | 303 |
293 // fetch task | 304 // fetch task |
294 if (TryDequeue(out unit)) { | 305 if (TryDequeue(out unit)) { |
295 InvokeUnit(unit); | 306 InvokeUnit(unit); |
307 count ++; | |
296 continue; | 308 continue; |
297 } | 309 } |
298 Interlocked.Decrement(ref m_activeThreads); | 310 Interlocked.Decrement(ref m_activeThreads); |
299 | 311 |
312 Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count); | |
300 // entering suspend state | 313 // entering suspend state |
301 // keep this thread and wait | 314 // keep this thread and wait |
302 if (!Suspend()) | 315 if (!Suspend()) |
303 break; | 316 break; |
317 count = 0; | |
304 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); | 318 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); |
305 Interlocked.Increment(ref m_activeThreads); | 319 Interlocked.Increment(ref m_activeThreads); |
306 } while (true); | 320 } while (true); |
307 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); | 321 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); |
308 } | 322 } |
309 | 323 |
310 protected virtual void Dispose(bool disposing) { | 324 protected virtual void Dispose(bool disposing) { |
311 if (disposing) { | 325 if (disposing) { |
312 if (m_exitRequired == 0) { | 326 if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier |
313 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | |
314 return; | |
315 | |
316 // wake sleeping threads | 327 // wake sleeping threads |
317 if (m_createdThreads > 0) | 328 if (m_createdThreads > 0) |
318 SignalThread(); | 329 SignalThread(); |
319 else | |
320 m_hasTasks.Dispose(); | |
321 GC.SuppressFinalize(this); | 330 GC.SuppressFinalize(this); |
322 } | 331 } |
323 } | 332 } |
324 } | 333 } |
325 | 334 |