Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 20:1c3b3d518480 promises
refactoring, sync
| author | cin |
|---|---|
| date | Tue, 12 Nov 2013 02:27:22 +0400 |
| parents | 7cd4a843b4e4 |
| children | 6a56df4ec59e |
comparison
equal
deleted
inserted
replaced
| 19:e3935fdf59a2 | 20:1c3b3d518480 |
|---|---|
| 7 | 7 |
| 8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
| 9 public abstract class DispatchPool<TUnit> : IDisposable { | 9 public abstract class DispatchPool<TUnit> : IDisposable { |
| 10 readonly int m_minThreads; | 10 readonly int m_minThreads; |
| 11 readonly int m_maxThreads; | 11 readonly int m_maxThreads; |
| 12 int m_runningThreads = 0; | 12 int m_createdThreads = 0; |
| 13 int m_activeThreads = 0; | |
| 14 int m_sleepingThreads = 0; | |
| 13 int m_maxRunningThreads = 0; | 15 int m_maxRunningThreads = 0; |
| 14 int m_suspended = 0; | |
| 15 int m_exitRequired = 0; | 16 int m_exitRequired = 0; |
| 17 int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit | |
| 16 AutoResetEvent m_hasTasks = new AutoResetEvent(false); | 18 AutoResetEvent m_hasTasks = new AutoResetEvent(false); |
| 17 | 19 |
| 18 protected DispatchPool(int min, int max) { | 20 protected DispatchPool(int min, int max) { |
| 19 if (min < 0) | 21 if (min < 0) |
| 20 throw new ArgumentOutOfRangeException("min"); | 22 throw new ArgumentOutOfRangeException("min"); |
| 42 protected void InitPool() { | 44 protected void InitPool() { |
| 43 for (int i = 0; i < m_minThreads; i++) | 45 for (int i = 0; i < m_minThreads; i++) |
| 44 StartWorker(); | 46 StartWorker(); |
| 45 } | 47 } |
| 46 | 48 |
| 47 public int ThreadCount { | 49 public int PoolSize { |
| 48 get { | 50 get { |
| 49 return m_runningThreads; | 51 return m_createdThreads; |
| 52 } | |
| 53 } | |
| 54 | |
| 55 public int ActiveThreads { | |
| 56 get { | |
| 57 return m_activeThreads; | |
| 50 } | 58 } |
| 51 } | 59 } |
| 52 | 60 |
| 53 public int MaxRunningThreads { | 61 public int MaxRunningThreads { |
| 54 get { | 62 get { |
| 63 } | 71 } |
| 64 | 72 |
| 65 protected abstract bool TryDequeue(out TUnit unit); | 73 protected abstract bool TryDequeue(out TUnit unit); |
| 66 | 74 |
| 67 protected virtual bool ExtendPool() { | 75 protected virtual bool ExtendPool() { |
| 68 if (m_suspended > 0) { | 76 if (m_sleepingThreads == 0) |
| 69 m_hasTasks.Set(); | 77 // no sleeping workers are available |
| 78 // try create one | |
| 79 return StartWorker(); | |
| 80 else { | |
| 81 // we can get here a race condition when several threads asks to extend pool | |
| 82 // and some sleaping threads are exited due timeout but they are still counted as sleeping | |
| 83 // in that case all of this threads could exit except one | |
| 84 WakePool(); | |
| 70 return true; | 85 return true; |
| 71 } else | 86 } |
| 72 return StartWorker(); | 87 |
| 73 } | 88 } |
| 74 | 89 |
| 75 /// <summary> | 90 /// <summary> |
| 76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | 91 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока |
| 77 /// </summary> | 92 /// </summary> |
| 78 protected void WakePool() { | 93 protected void WakePool() { |
| 79 m_hasTasks.Set(); // wake sleeping thread; | 94 m_hasTasks.Set(); // wake sleeping thread; |
| 80 | 95 |
| 81 if (AllocateThreadSlot(1)) { | 96 if (AllocateThreadSlot(1)) { |
| 97 // if there were no threads in the pool | |
| 82 var worker = new Thread(this.Worker); | 98 var worker = new Thread(this.Worker); |
| 83 worker.IsBackground = true; | 99 worker.IsBackground = true; |
| 84 worker.Start(); | 100 worker.Start(); |
| 85 } | 101 } |
| 86 } | 102 } |
| 87 | 103 |
| 88 protected virtual void Suspend() { | 104 bool Sleep(int timeout) { |
| 89 m_hasTasks.WaitOne(); | 105 Interlocked.Increment(ref m_sleepingThreads); |
| 106 var result = m_hasTasks.WaitOne(timeout); | |
| 107 Interlocked.Decrement(ref m_sleepingThreads); | |
| 108 return result; | |
| 109 } | |
| 110 | |
| 111 protected virtual bool Suspend() { | |
| 112 //no tasks left, exit if the thread is no longer needed | |
| 113 bool last; | |
| 114 bool requestExit; | |
| 115 | |
| 116 if (m_releaseTimeout > 0) | |
| 117 requestExit = !Sleep(m_releaseTimeout); | |
| 118 else | |
| 119 requestExit = true; | |
| 120 | |
| 121 | |
| 122 if (requestExit && ReleaseThreadSlot(out last)) { | |
| 123 // in case at the moment the last thread was being released | |
| 124 // a new task was added to the queue, we need to try | |
| 125 // to revoke the thread to avoid the situation when the task is left unprocessed | |
| 126 if (last && m_hasTasks.WaitOne(0)) { | |
| 127 if (AllocateThreadSlot(1)) | |
| 128 return true; // spin again... | |
| 129 else | |
| 130 // we failed to reallocate the first slot for this thread | |
| 131 // therefore we need to release the event | |
| 132 m_hasTasks.Set(); | |
| 133 } | |
| 134 | |
| 135 return false; | |
| 136 } | |
| 137 | |
| 138 Sleep(-1); | |
| 139 | |
| 140 return true; | |
| 90 } | 141 } |
| 91 | 142 |
| 92 #region thread slots traits | 143 #region thread slots traits |
| 93 | 144 |
| 94 bool AllocateThreadSlot() { | 145 bool AllocateThreadSlot() { |
| 95 int current; | 146 int current; |
| 96 // use spins to allocate slot for the new thread | 147 // use spins to allocate slot for the new thread |
| 97 do { | 148 do { |
| 98 current = m_runningThreads; | 149 current = m_createdThreads; |
| 99 if (current >= m_maxThreads || m_exitRequired != 0) | 150 if (current >= m_maxThreads || m_exitRequired != 0) |
| 100 // no more slots left or the pool has been disposed | 151 // no more slots left or the pool has been disposed |
| 101 return false; | 152 return false; |
| 102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | 153 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
| 103 | 154 |
| 104 UpdateMaxThreads(current + 1); | 155 UpdateMaxThreads(current + 1); |
| 105 | 156 |
| 106 return true; | 157 return true; |
| 107 } | 158 } |
| 108 | 159 |
| 109 bool AllocateThreadSlot(int desired) { | 160 bool AllocateThreadSlot(int desired) { |
| 110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) | 161 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) |
| 111 return false; | 162 return false; |
| 112 | 163 |
| 113 UpdateMaxThreads(desired); | 164 UpdateMaxThreads(desired); |
| 114 | 165 |
| 115 return true; | 166 return true; |
| 118 bool ReleaseThreadSlot(out bool last) { | 169 bool ReleaseThreadSlot(out bool last) { |
| 119 last = false; | 170 last = false; |
| 120 int current; | 171 int current; |
| 121 // use spins to release slot for the new thread | 172 // use spins to release slot for the new thread |
| 122 do { | 173 do { |
| 123 current = m_runningThreads; | 174 current = m_createdThreads; |
| 124 if (current <= m_minThreads && m_exitRequired == 0) | 175 if (current <= m_minThreads && m_exitRequired == 0) |
| 125 // the thread is reserved | 176 // the thread is reserved |
| 126 return false; | 177 return false; |
| 127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | 178 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); |
| 128 | 179 |
| 129 last = (current == 1); | 180 last = (current == 1); |
| 130 | 181 |
| 131 return true; | 182 return true; |
| 132 } | 183 } |
| 134 /// <summary> | 185 /// <summary> |
| 135 /// releases thread slot unconditionally, used during cleanup | 186 /// releases thread slot unconditionally, used during cleanup |
| 136 /// </summary> | 187 /// </summary> |
| 137 /// <returns>true - no more threads left</returns> | 188 /// <returns>true - no more threads left</returns> |
| 138 bool ReleaseThreadSlotAnyway() { | 189 bool ReleaseThreadSlotAnyway() { |
| 139 var left = Interlocked.Decrement(ref m_runningThreads); | 190 var left = Interlocked.Decrement(ref m_createdThreads); |
| 140 return left == 0; | 191 return left == 0; |
| 141 } | 192 } |
| 142 | 193 |
| 143 void UpdateMaxThreads(int count) { | 194 void UpdateMaxThreads(int count) { |
| 144 int max; | 195 int max; |
| 167 bool FetchTask(out TUnit unit) { | 218 bool FetchTask(out TUnit unit) { |
| 168 do { | 219 do { |
| 169 // exit if requested | 220 // exit if requested |
| 170 if (m_exitRequired != 0) { | 221 if (m_exitRequired != 0) { |
| 171 // release the thread slot | 222 // release the thread slot |
| 223 Interlocked.Decrement(ref m_activeThreads); | |
| 172 if (ReleaseThreadSlotAnyway()) // it was the last worker | 224 if (ReleaseThreadSlotAnyway()) // it was the last worker |
| 173 m_hasTasks.Dispose(); | 225 m_hasTasks.Dispose(); |
| 174 else | 226 else |
| 175 m_hasTasks.Set(); // wake next worker | 227 m_hasTasks.Set(); // wake next worker |
| 176 unit = default(TUnit); | 228 unit = default(TUnit); |
| 181 if (TryDequeue(out unit)) { | 233 if (TryDequeue(out unit)) { |
| 182 ExtendPool(); | 234 ExtendPool(); |
| 183 return true; | 235 return true; |
| 184 } | 236 } |
| 185 | 237 |
| 186 //no tasks left, exit if the thread is no longer needed | 238 Interlocked.Decrement(ref m_activeThreads); |
| 187 bool last; | |
| 188 if (ReleaseThreadSlot(out last)) { | |
| 189 if (last && m_hasTasks.WaitOne(0)) { | |
| 190 if (AllocateThreadSlot(1)) | |
| 191 continue; // spin again... | |
| 192 else | |
| 193 // we failed to reallocate slot for this thread | |
| 194 // therefore we need to release the event | |
| 195 m_hasTasks.Set(); | |
| 196 } | |
| 197 | |
| 198 return false; | |
| 199 } | |
| 200 | 239 |
| 201 // entering suspend state | 240 // entering suspend state |
| 202 Interlocked.Increment(ref m_suspended); | |
| 203 // keep this thread and wait | 241 // keep this thread and wait |
| 204 Suspend(); | 242 if (!Suspend()) |
| 205 Interlocked.Decrement(ref m_suspended); | 243 return false; |
| 244 | |
| 245 Interlocked.Increment(ref m_activeThreads); | |
| 206 } while (true); | 246 } while (true); |
| 207 } | 247 } |
| 208 | 248 |
| 209 protected abstract void InvokeUnit(TUnit unit); | 249 protected abstract void InvokeUnit(TUnit unit); |
| 210 | 250 |
| 211 void Worker() { | 251 void Worker() { |
| 212 TUnit unit; | 252 TUnit unit; |
| 253 Interlocked.Increment(ref m_activeThreads); | |
| 213 while (FetchTask(out unit)) | 254 while (FetchTask(out unit)) |
| 214 InvokeUnit(unit); | 255 InvokeUnit(unit); |
| 215 } | 256 } |
| 216 | 257 |
| 217 protected virtual void Dispose(bool disposing) { | 258 protected virtual void Dispose(bool disposing) { |
