comparison Implab/Parallels/DispatchPool.cs @ 21:6a56df4ec59e promises

DispatchPool works again, but performance is poor in some cases
author cin
date Tue, 12 Nov 2013 19:52:10 +0400
parents 1c3b3d518480
children 5a35900264f5
comparison
equal deleted inserted replaced
20:1c3b3d518480 21:6a56df4ec59e
7 7
8 namespace Implab.Parallels { 8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable { 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads; 10 readonly int m_minThreads;
11 readonly int m_maxThreads; 11 readonly int m_maxThreads;
12 int m_createdThreads = 0; 12
13 int m_activeThreads = 0; 13 int m_createdThreads = 0; // the current size of the pool
14 int m_sleepingThreads = 0; 14 int m_activeThreads = 0; // the count of threads which are active
15 int m_maxRunningThreads = 0; 15 int m_sleepingThreads = 0; // the count of currently inactive threads
16 int m_exitRequired = 0; 16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
17 int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit 17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
19 int m_wakeEvents = 0; // the count of wake events
20
18 AutoResetEvent m_hasTasks = new AutoResetEvent(false); 21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
19 22
20 protected DispatchPool(int min, int max) { 23 protected DispatchPool(int min, int max) {
21 if (min < 0) 24 if (min < 0)
22 throw new ArgumentOutOfRangeException("min"); 25 throw new ArgumentOutOfRangeException("min");
70 } 73 }
71 } 74 }
72 75
73 protected abstract bool TryDequeue(out TUnit unit); 76 protected abstract bool TryDequeue(out TUnit unit);
74 77
75 protected virtual bool ExtendPool() { 78 #region thread execution traits
76 if (m_sleepingThreads == 0) 79 int SignalThread() {
77 // no sleeping workers are available 80 var signals = Interlocked.Increment(ref m_wakeEvents);
78 // try create one 81 if(signals == 1)
79 return StartWorker(); 82 m_hasTasks.Set();
80 else { 83 return signals;
81 // we can get here a race condition when several threads asks to extend pool 84 }
82 // and some sleaping threads are exited due timeout but they are still counted as sleeping 85
83 // in that case all of this threads could exit except one 86 bool Sleep(int timeout) {
84 WakePool(); 87 Interlocked.Increment(ref m_sleepingThreads);
88 if (m_hasTasks.WaitOne(timeout)) {
89 // this is autoreset event, only one thread can run this block simultaneously
90 var sleeping = Interlocked.Decrement(ref m_sleepingThreads);
91 if (Interlocked.Decrement(ref m_wakeEvents) > 0)
92 m_hasTasks.Set(); // wake next worker
93
85 return true; 94 return true;
86 } 95 } else {
87 96 Interlocked.Decrement(ref m_sleepingThreads);
88 } 97 return false;
98 }
99 }
100 #endregion
89 101
90 /// <summary> 102 /// <summary>
91 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока 103 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
92 /// </summary> 104 /// </summary>
93 protected void WakePool() { 105 protected void GrowPool() {
94 m_hasTasks.Set(); // wake sleeping thread; 106 if (m_exitRequired != 0)
95 107 return;
96 if (AllocateThreadSlot(1)) { 108 if (m_sleepingThreads > m_wakeEvents) {
97 // if there were no threads in the pool 109 // all sleeping threads may gone
98 var worker = new Thread(this.Worker); 110 SignalThread(); // wake a sleeping thread;
99 worker.IsBackground = true; 111
100 worker.Start(); 112 // we can't check whether signal has been processed
101 } 113 // anyway it may take some time for the thread to start
102 } 114 // we will ensure that at least one thread is running
103 115
104 bool Sleep(int timeout) { 116 if (AllocateThreadSlot(1)) {
105 Interlocked.Increment(ref m_sleepingThreads); 117 // if there were no threads in the pool
106 var result = m_hasTasks.WaitOne(timeout); 118 var worker = new Thread(this.Worker);
107 Interlocked.Decrement(ref m_sleepingThreads); 119 worker.IsBackground = true;
108 return result; 120 worker.Start();
109 } 121 }
110 122 } else {
111 protected virtual bool Suspend() { 123 // if there is no sleeping threads in the pool
124 StartWorker();
125 }
126 }
127
128 private bool Suspend() {
112 //no tasks left, exit if the thread is no longer needed 129 //no tasks left, exit if the thread is no longer needed
113 bool last; 130 bool last;
114 bool requestExit; 131 bool requestExit;
115 132
133
134
135 // if threads have a timeout before releasing
116 if (m_releaseTimeout > 0) 136 if (m_releaseTimeout > 0)
117 requestExit = !Sleep(m_releaseTimeout); 137 requestExit = !Sleep(m_releaseTimeout);
118 else 138 else
119 requestExit = true; 139 requestExit = true;
120 140
121 141 if (!requestExit)
142 return true;
143
144 // release unsused thread
122 if (requestExit && ReleaseThreadSlot(out last)) { 145 if (requestExit && ReleaseThreadSlot(out last)) {
123 // in case at the moment the last thread was being released 146 // in case at the moment the last thread was being released
124 // a new task was added to the queue, we need to try 147 // a new task was added to the queue, we need to try
125 // to revoke the thread to avoid the situation when the task is left unprocessed 148 // to revoke the thread to avoid the situation when the task is left unprocessed
126 if (last && m_hasTasks.WaitOne(0)) { 149 if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
127 if (AllocateThreadSlot(1)) 150 if (AllocateThreadSlot(1))
128 return true; // spin again... 151 return true; // spin again...
129 else 152 else
130 // we failed to reallocate the first slot for this thread 153 SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
131 // therefore we need to release the event 154
132 m_hasTasks.Set();
133 } 155 }
134 156
135 return false; 157 return false;
136 } 158 }
137 159
160 // wait till infinity
138 Sleep(-1); 161 Sleep(-1);
139 162
140 return true; 163 return true;
141 } 164 }
142 165
213 } else { 236 } else {
214 return false; 237 return false;
215 } 238 }
216 } 239 }
217 240
218 bool FetchTask(out TUnit unit) { 241 protected abstract void InvokeUnit(TUnit unit);
242
243 void Worker() {
244 TUnit unit;
245 Interlocked.Increment(ref m_activeThreads);
246 Sleep(0); // remove wake request if the new thread is started
219 do { 247 do {
220 // exit if requested 248 // exit if requested
221 if (m_exitRequired != 0) { 249 if (m_exitRequired != 0) {
222 // release the thread slot 250 // release the thread slot
223 Interlocked.Decrement(ref m_activeThreads); 251 Interlocked.Decrement(ref m_activeThreads);
224 if (ReleaseThreadSlotAnyway()) // it was the last worker 252 if (ReleaseThreadSlotAnyway()) // it was the last worker
225 m_hasTasks.Dispose(); 253 m_hasTasks.Dispose();
226 else 254 else
227 m_hasTasks.Set(); // wake next worker 255 SignalThread(); // wake next worker
228 unit = default(TUnit); 256 unit = default(TUnit);
229 return false; 257 break;
230 } 258 }
231 259
232 // fetch task 260 // fetch task
233 if (TryDequeue(out unit)) { 261 if (TryDequeue(out unit)) {
234 ExtendPool(); 262 InvokeUnit(unit);
235 return true; 263 continue;
236 } 264 }
237 265
238 Interlocked.Decrement(ref m_activeThreads); 266 Interlocked.Decrement(ref m_activeThreads);
239 267
240 // entering suspend state 268 // entering suspend state
241 // keep this thread and wait 269 // keep this thread and wait
242 if (!Suspend()) 270 if (!Suspend())
243 return false; 271 break;
244 272
245 Interlocked.Increment(ref m_activeThreads); 273 Interlocked.Increment(ref m_activeThreads);
246 } while (true); 274 } while (true);
247 } 275
248
249 protected abstract void InvokeUnit(TUnit unit);
250
251 void Worker() {
252 TUnit unit;
253 Interlocked.Increment(ref m_activeThreads);
254 while (FetchTask(out unit))
255 InvokeUnit(unit);
256 } 276 }
257 277
258 protected virtual void Dispose(bool disposing) { 278 protected virtual void Dispose(bool disposing) {
259 if (disposing) { 279 if (disposing) {
260 if (m_exitRequired == 0) { 280 if (m_exitRequired == 0) {
261 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) 281 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
262 return; 282 return;
263 283
264 // wake sleeping threads 284 // wake sleeping threads
265 m_hasTasks.Set(); 285 if (m_createdThreads > 0)
286 SignalThread();
287 else
288 m_hasTasks.Dispose();
266 GC.SuppressFinalize(this); 289 GC.SuppressFinalize(this);
267 } 290 }
268 } 291 }
269 } 292 }
270 293