Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 21:6a56df4ec59e promises
DispatchPool works again, but performance is poor in some cases
author | cin |
---|---|
date | Tue, 12 Nov 2013 19:52:10 +0400 |
parents | 1c3b3d518480 |
children | 5a35900264f5 |
comparison
equal
deleted
inserted
replaced
20:1c3b3d518480 | 21:6a56df4ec59e |
---|---|
7 | 7 |
8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
9 public abstract class DispatchPool<TUnit> : IDisposable { | 9 public abstract class DispatchPool<TUnit> : IDisposable { |
10 readonly int m_minThreads; | 10 readonly int m_minThreads; |
11 readonly int m_maxThreads; | 11 readonly int m_maxThreads; |
12 int m_createdThreads = 0; | 12 |
13 int m_activeThreads = 0; | 13 int m_createdThreads = 0; // the current size of the pool |
14 int m_sleepingThreads = 0; | 14 int m_activeThreads = 0; // the count of threads which are active |
15 int m_maxRunningThreads = 0; | 15 int m_sleepingThreads = 0; // the count of currently inactive threads |
16 int m_exitRequired = 0; | 16 int m_maxRunningThreads = 0; // the meximum reached size of the pool |
17 int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit | 17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released |
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit | |
19 int m_wakeEvents = 0; // the count of wake events | |
20 | |
18 AutoResetEvent m_hasTasks = new AutoResetEvent(false); | 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false); |
19 | 22 |
20 protected DispatchPool(int min, int max) { | 23 protected DispatchPool(int min, int max) { |
21 if (min < 0) | 24 if (min < 0) |
22 throw new ArgumentOutOfRangeException("min"); | 25 throw new ArgumentOutOfRangeException("min"); |
70 } | 73 } |
71 } | 74 } |
72 | 75 |
73 protected abstract bool TryDequeue(out TUnit unit); | 76 protected abstract bool TryDequeue(out TUnit unit); |
74 | 77 |
75 protected virtual bool ExtendPool() { | 78 #region thread execution traits |
76 if (m_sleepingThreads == 0) | 79 int SignalThread() { |
77 // no sleeping workers are available | 80 var signals = Interlocked.Increment(ref m_wakeEvents); |
78 // try create one | 81 if(signals == 1) |
79 return StartWorker(); | 82 m_hasTasks.Set(); |
80 else { | 83 return signals; |
81 // we can get here a race condition when several threads asks to extend pool | 84 } |
82 // and some sleaping threads are exited due timeout but they are still counted as sleeping | 85 |
83 // in that case all of this threads could exit except one | 86 bool Sleep(int timeout) { |
84 WakePool(); | 87 Interlocked.Increment(ref m_sleepingThreads); |
88 if (m_hasTasks.WaitOne(timeout)) { | |
89 // this is autoreset event, only one thread can run this block simultaneously | |
90 var sleeping = Interlocked.Decrement(ref m_sleepingThreads); | |
91 if (Interlocked.Decrement(ref m_wakeEvents) > 0) | |
92 m_hasTasks.Set(); // wake next worker | |
93 | |
85 return true; | 94 return true; |
86 } | 95 } else { |
87 | 96 Interlocked.Decrement(ref m_sleepingThreads); |
88 } | 97 return false; |
98 } | |
99 } | |
100 #endregion | |
89 | 101 |
90 /// <summary> | 102 /// <summary> |
91 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | 103 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока |
92 /// </summary> | 104 /// </summary> |
93 protected void WakePool() { | 105 protected void GrowPool() { |
94 m_hasTasks.Set(); // wake sleeping thread; | 106 if (m_exitRequired != 0) |
95 | 107 return; |
96 if (AllocateThreadSlot(1)) { | 108 if (m_sleepingThreads > m_wakeEvents) { |
97 // if there were no threads in the pool | 109 // all sleeping threads may gone |
98 var worker = new Thread(this.Worker); | 110 SignalThread(); // wake a sleeping thread; |
99 worker.IsBackground = true; | 111 |
100 worker.Start(); | 112 // we can't check whether signal has been processed |
101 } | 113 // anyway it may take some time for the thread to start |
102 } | 114 // we will ensure that at least one thread is running |
103 | 115 |
104 bool Sleep(int timeout) { | 116 if (AllocateThreadSlot(1)) { |
105 Interlocked.Increment(ref m_sleepingThreads); | 117 // if there were no threads in the pool |
106 var result = m_hasTasks.WaitOne(timeout); | 118 var worker = new Thread(this.Worker); |
107 Interlocked.Decrement(ref m_sleepingThreads); | 119 worker.IsBackground = true; |
108 return result; | 120 worker.Start(); |
109 } | 121 } |
110 | 122 } else { |
111 protected virtual bool Suspend() { | 123 // if there is no sleeping threads in the pool |
124 StartWorker(); | |
125 } | |
126 } | |
127 | |
128 private bool Suspend() { | |
112 //no tasks left, exit if the thread is no longer needed | 129 //no tasks left, exit if the thread is no longer needed |
113 bool last; | 130 bool last; |
114 bool requestExit; | 131 bool requestExit; |
115 | 132 |
133 | |
134 | |
135 // if threads have a timeout before releasing | |
116 if (m_releaseTimeout > 0) | 136 if (m_releaseTimeout > 0) |
117 requestExit = !Sleep(m_releaseTimeout); | 137 requestExit = !Sleep(m_releaseTimeout); |
118 else | 138 else |
119 requestExit = true; | 139 requestExit = true; |
120 | 140 |
121 | 141 if (!requestExit) |
142 return true; | |
143 | |
144 // release unsused thread | |
122 if (requestExit && ReleaseThreadSlot(out last)) { | 145 if (requestExit && ReleaseThreadSlot(out last)) { |
123 // in case at the moment the last thread was being released | 146 // in case at the moment the last thread was being released |
124 // a new task was added to the queue, we need to try | 147 // a new task was added to the queue, we need to try |
125 // to revoke the thread to avoid the situation when the task is left unprocessed | 148 // to revoke the thread to avoid the situation when the task is left unprocessed |
126 if (last && m_hasTasks.WaitOne(0)) { | 149 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false |
127 if (AllocateThreadSlot(1)) | 150 if (AllocateThreadSlot(1)) |
128 return true; // spin again... | 151 return true; // spin again... |
129 else | 152 else |
130 // we failed to reallocate the first slot for this thread | 153 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it |
131 // therefore we need to release the event | 154 |
132 m_hasTasks.Set(); | |
133 } | 155 } |
134 | 156 |
135 return false; | 157 return false; |
136 } | 158 } |
137 | 159 |
160 // wait till infinity | |
138 Sleep(-1); | 161 Sleep(-1); |
139 | 162 |
140 return true; | 163 return true; |
141 } | 164 } |
142 | 165 |
213 } else { | 236 } else { |
214 return false; | 237 return false; |
215 } | 238 } |
216 } | 239 } |
217 | 240 |
218 bool FetchTask(out TUnit unit) { | 241 protected abstract void InvokeUnit(TUnit unit); |
242 | |
243 void Worker() { | |
244 TUnit unit; | |
245 Interlocked.Increment(ref m_activeThreads); | |
246 Sleep(0); // remove wake request if the new thread is started | |
219 do { | 247 do { |
220 // exit if requested | 248 // exit if requested |
221 if (m_exitRequired != 0) { | 249 if (m_exitRequired != 0) { |
222 // release the thread slot | 250 // release the thread slot |
223 Interlocked.Decrement(ref m_activeThreads); | 251 Interlocked.Decrement(ref m_activeThreads); |
224 if (ReleaseThreadSlotAnyway()) // it was the last worker | 252 if (ReleaseThreadSlotAnyway()) // it was the last worker |
225 m_hasTasks.Dispose(); | 253 m_hasTasks.Dispose(); |
226 else | 254 else |
227 m_hasTasks.Set(); // wake next worker | 255 SignalThread(); // wake next worker |
228 unit = default(TUnit); | 256 unit = default(TUnit); |
229 return false; | 257 break; |
230 } | 258 } |
231 | 259 |
232 // fetch task | 260 // fetch task |
233 if (TryDequeue(out unit)) { | 261 if (TryDequeue(out unit)) { |
234 ExtendPool(); | 262 InvokeUnit(unit); |
235 return true; | 263 continue; |
236 } | 264 } |
237 | 265 |
238 Interlocked.Decrement(ref m_activeThreads); | 266 Interlocked.Decrement(ref m_activeThreads); |
239 | 267 |
240 // entering suspend state | 268 // entering suspend state |
241 // keep this thread and wait | 269 // keep this thread and wait |
242 if (!Suspend()) | 270 if (!Suspend()) |
243 return false; | 271 break; |
244 | 272 |
245 Interlocked.Increment(ref m_activeThreads); | 273 Interlocked.Increment(ref m_activeThreads); |
246 } while (true); | 274 } while (true); |
247 } | 275 |
248 | |
249 protected abstract void InvokeUnit(TUnit unit); | |
250 | |
251 void Worker() { | |
252 TUnit unit; | |
253 Interlocked.Increment(ref m_activeThreads); | |
254 while (FetchTask(out unit)) | |
255 InvokeUnit(unit); | |
256 } | 276 } |
257 | 277 |
258 protected virtual void Dispose(bool disposing) { | 278 protected virtual void Dispose(bool disposing) { |
259 if (disposing) { | 279 if (disposing) { |
260 if (m_exitRequired == 0) { | 280 if (m_exitRequired == 0) { |
261 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | 281 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) |
262 return; | 282 return; |
263 | 283 |
264 // wake sleeping threads | 284 // wake sleeping threads |
265 m_hasTasks.Set(); | 285 if (m_createdThreads > 0) |
286 SignalThread(); | |
287 else | |
288 m_hasTasks.Dispose(); | |
266 GC.SuppressFinalize(this); | 289 GC.SuppressFinalize(this); |
267 } | 290 } |
268 } | 291 } |
269 } | 292 } |
270 | 293 |