comparison Implab/Parallels/DispatchPool.cs @ 80:4f20870d0816 v2

added memory barriers
author cin
date Fri, 26 Sep 2014 03:32:34 +0400
parents 2fc0fbe7d58b
children 2c5631b43c7d
comparison
equal deleted inserted replaced
79:05e6468f066f 80:4f20870d0816
7 7
8 namespace Implab.Parallels { 8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable { 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads; 10 readonly int m_minThreads;
11 readonly int m_maxThreads; 11 readonly int m_maxThreads;
12 readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
12 13
13 int m_createdThreads = 0; // the current size of the pool 14 int m_createdThreads = 0; // the current size of the pool
14 int m_activeThreads = 0; // the count of threads which are active 15 int m_activeThreads = 0; // the count of threads which are active
15 int m_sleepingThreads = 0; // the count of currently inactive threads 16 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool 17 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released 18 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit 19
19 int m_wakeEvents = 0; // the count of wake events 20 int m_wakeEvents = 0; // the count of wake events
20 21
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false); 22 readonly object m_signalLocker = new object();
22 23
23 protected DispatchPool(int min, int max) { 24 protected DispatchPool(int min, int max) {
24 if (min < 0) 25 if (min < 0)
25 throw new ArgumentOutOfRangeException("min"); 26 throw new ArgumentOutOfRangeException("min");
26 if (max <= 0) 27 if (max <= 0)
49 StartWorker(); 50 StartWorker();
50 } 51 }
51 52
52 public int PoolSize { 53 public int PoolSize {
53 get { 54 get {
55 Thread.MemoryBarrier();
54 return m_createdThreads; 56 return m_createdThreads;
55 } 57 }
56 } 58 }
57 59
58 public int ActiveThreads { 60 public int ActiveThreads {
59 get { 61 get {
62 Thread.MemoryBarrier();
60 return m_activeThreads; 63 return m_activeThreads;
61 } 64 }
62 } 65 }
63 66
64 public int MaxRunningThreads { 67 public int MaxRunningThreads {
65 get { 68 get {
69 Thread.MemoryBarrier();
66 return m_maxRunningThreads; 70 return m_maxRunningThreads;
67 } 71 }
68 } 72 }
69 73
70 protected bool IsDisposed { 74 protected bool IsDisposed {
71 get { 75 get {
72 return m_exitRequired != 0; 76 Thread.MemoryBarrier();
77 return m_exitRequired == 1;
73 } 78 }
74 } 79 }
75 80
76 protected abstract bool TryDequeue(out TUnit unit); 81 protected abstract bool TryDequeue(out TUnit unit);
77 82
78 #region thread execution traits 83 #region thread signaling traits
79 int SignalThread() { 84 int SignalThread() {
80 var signals = Interlocked.Increment(ref m_wakeEvents); 85 var signals = Interlocked.Increment(ref m_wakeEvents);
81 if(signals == 1) 86 if(signals == 1)
82 m_hasTasks.Set(); 87 lock(m_signalLocker)
88 Monitor.Pulse(m_signalLocker);
83 return signals; 89 return signals;
84 } 90 }
85 91
86 bool FetchSignalOrWait(int timeout) { 92 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount; 93 var start = Environment.TickCount;
88 94 int signals;
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен 95 Thread.MemoryBarrier(); // m_wakeEvents volatile first read
90 // ее вернуть, чтобы другой ожидающий поток смог 96 do {
91 bool hasLock = false; 97 signals = m_wakeEvents;
92 do { 98 if (signals == 0)
93 int signals; 99 break;
94 do { 100 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
95 signals = m_wakeEvents; 101
96 if (signals == 0) 102 if (signals == 0) {
97 break; 103 // no signal is fetched
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); 104 lock(m_signalLocker) {
99 105 while(m_wakeEvents == 0) {
100 if (signals >= 1) { 106 if (timeout != -1)
101 if (signals > 1 && hasLock) 107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
102 m_hasTasks.Set(); 108 if(!Monitor.Wait(m_signalLocker,timeout))
109 return false; // timeout
110 }
111 // m_wakeEvents > 0
112 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
113 Monitor.Pulse(m_signalLocker);
114
115 // signal fetched
103 return true; 116 return true;
104 } 117 }
105 118
106 if (timeout != -1) 119 } else {
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start)); 120 // signal fetched
108 121 return true;
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие 122 }
110 // и уйдет на пустой цикл, после чего заблокируется 123
111 124
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
114
115 return false;
116 } 125 }
117 126
118 bool Sleep(int timeout) { 127 bool Sleep(int timeout) {
119 Interlocked.Increment(ref m_sleepingThreads); 128 Interlocked.Increment(ref m_sleepingThreads);
120 if (FetchSignalOrWait(timeout)) { 129 if (FetchSignalOrWait(timeout)) {
129 138
130 /// <summary> 139 /// <summary>
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока 140 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
132 /// </summary> 141 /// </summary>
133 protected void GrowPool() { 142 protected void GrowPool() {
134 if (m_exitRequired != 0) 143 Thread.MemoryBarrier();
144 if (m_exitRequired == 1)
135 return; 145 return;
136 if (m_sleepingThreads > m_wakeEvents) { 146 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); 147 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138 148
139 // all sleeping threads may gone 149 // all sleeping threads may gone
202 bool AllocateThreadSlot() { 212 bool AllocateThreadSlot() {
203 int current; 213 int current;
204 // use spins to allocate slot for the new thread 214 // use spins to allocate slot for the new thread
205 do { 215 do {
206 current = m_createdThreads; 216 current = m_createdThreads;
207 if (current >= m_maxThreads || m_exitRequired != 0) 217 if (current >= m_maxThreads || m_exitRequired == 1)
208 // no more slots left or the pool has been disposed 218 // no more slots left or the pool has been disposed
209 return false; 219 return false;
210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); 220 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
211 221
212 UpdateMaxThreads(current + 1); 222 UpdateMaxThreads(current + 1);
225 235
226 bool ReleaseThreadSlot(out bool last) { 236 bool ReleaseThreadSlot(out bool last) {
227 last = false; 237 last = false;
228 int current; 238 int current;
229 // use spins to release slot for the new thread 239 // use spins to release slot for the new thread
240 Thread.MemoryBarrier();
230 do { 241 do {
231 current = m_createdThreads; 242 current = m_createdThreads;
232 if (current <= m_minThreads && m_exitRequired == 0) 243 if (current <= m_minThreads && m_exitRequired == 0)
233 // the thread is reserved 244 // the thread is reserved
234 return false; 245 return false;
262 bool StartWorker() { 273 bool StartWorker() {
263 if (AllocateThreadSlot()) { 274 if (AllocateThreadSlot()) {
264 // slot successfully allocated 275 // slot successfully allocated
265 var worker = new Thread(this.Worker); 276 var worker = new Thread(this.Worker);
266 worker.IsBackground = true; 277 worker.IsBackground = true;
278 Interlocked.Increment(ref m_activeThreads);
267 worker.Start(); 279 worker.Start();
268 280
269 return true; 281 return true;
270 } else { 282 } else {
271 return false; 283 return false;
275 protected abstract void InvokeUnit(TUnit unit); 287 protected abstract void InvokeUnit(TUnit unit);
276 288
277 protected virtual void Worker() { 289 protected virtual void Worker() {
278 TUnit unit; 290 TUnit unit;
279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); 291 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
280 Interlocked.Increment(ref m_activeThreads); 292 int count = 0;;
293 Thread.MemoryBarrier();
281 do { 294 do {
282 // exit if requested 295 // exit if requested
283 if (m_exitRequired != 0) { 296 if (m_exitRequired == 1) {
284 // release the thread slot 297 // release the thread slot
285 Interlocked.Decrement(ref m_activeThreads); 298 Interlocked.Decrement(ref m_activeThreads);
286 if (ReleaseThreadSlotAnyway()) // it was the last worker 299 if (!ReleaseThreadSlotAnyway()) // it was the last worker
287 m_hasTasks.Dispose();
288 else
289 SignalThread(); // wake next worker 300 SignalThread(); // wake next worker
290 break; 301 break;
291 } 302 }
292 303
293 // fetch task 304 // fetch task
294 if (TryDequeue(out unit)) { 305 if (TryDequeue(out unit)) {
295 InvokeUnit(unit); 306 InvokeUnit(unit);
307 count ++;
296 continue; 308 continue;
297 } 309 }
298 Interlocked.Decrement(ref m_activeThreads); 310 Interlocked.Decrement(ref m_activeThreads);
299 311
312 Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
300 // entering suspend state 313 // entering suspend state
301 // keep this thread and wait 314 // keep this thread and wait
302 if (!Suspend()) 315 if (!Suspend())
303 break; 316 break;
317 count = 0;
304 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); 318 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
305 Interlocked.Increment(ref m_activeThreads); 319 Interlocked.Increment(ref m_activeThreads);
306 } while (true); 320 } while (true);
307 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); 321 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
308 } 322 }
309 323
310 protected virtual void Dispose(bool disposing) { 324 protected virtual void Dispose(bool disposing) {
311 if (disposing) { 325 if (disposing) {
312 if (m_exitRequired == 0) { 326 if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
313 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
314 return;
315
316 // wake sleeping threads 327 // wake sleeping threads
317 if (m_createdThreads > 0) 328 if (m_createdThreads > 0)
318 SignalThread(); 329 SignalThread();
319 else
320 m_hasTasks.Dispose();
321 GC.SuppressFinalize(this); 330 GC.SuppressFinalize(this);
322 } 331 }
323 } 332 }
324 } 333 }
325 334