Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 20:1c3b3d518480 promises
refactoring, sync
author | cin |
---|---|
date | Tue, 12 Nov 2013 02:27:22 +0400 |
parents | 7cd4a843b4e4 |
children | 6a56df4ec59e |
comparison
equal
deleted
inserted
replaced
19:e3935fdf59a2 | 20:1c3b3d518480 |
---|---|
7 | 7 |
8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
9 public abstract class DispatchPool<TUnit> : IDisposable { | 9 public abstract class DispatchPool<TUnit> : IDisposable { |
10 readonly int m_minThreads; | 10 readonly int m_minThreads; |
11 readonly int m_maxThreads; | 11 readonly int m_maxThreads; |
12 int m_runningThreads = 0; | 12 int m_createdThreads = 0; |
13 int m_activeThreads = 0; | |
14 int m_sleepingThreads = 0; | |
13 int m_maxRunningThreads = 0; | 15 int m_maxRunningThreads = 0; |
14 int m_suspended = 0; | |
15 int m_exitRequired = 0; | 16 int m_exitRequired = 0; |
17 int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit | |
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false); | 18 AutoResetEvent m_hasTasks = new AutoResetEvent(false); |
17 | 19 |
18 protected DispatchPool(int min, int max) { | 20 protected DispatchPool(int min, int max) { |
19 if (min < 0) | 21 if (min < 0) |
20 throw new ArgumentOutOfRangeException("min"); | 22 throw new ArgumentOutOfRangeException("min"); |
42 protected void InitPool() { | 44 protected void InitPool() { |
43 for (int i = 0; i < m_minThreads; i++) | 45 for (int i = 0; i < m_minThreads; i++) |
44 StartWorker(); | 46 StartWorker(); |
45 } | 47 } |
46 | 48 |
47 public int ThreadCount { | 49 public int PoolSize { |
48 get { | 50 get { |
49 return m_runningThreads; | 51 return m_createdThreads; |
52 } | |
53 } | |
54 | |
55 public int ActiveThreads { | |
56 get { | |
57 return m_activeThreads; | |
50 } | 58 } |
51 } | 59 } |
52 | 60 |
53 public int MaxRunningThreads { | 61 public int MaxRunningThreads { |
54 get { | 62 get { |
63 } | 71 } |
64 | 72 |
65 protected abstract bool TryDequeue(out TUnit unit); | 73 protected abstract bool TryDequeue(out TUnit unit); |
66 | 74 |
67 protected virtual bool ExtendPool() { | 75 protected virtual bool ExtendPool() { |
68 if (m_suspended > 0) { | 76 if (m_sleepingThreads == 0) |
69 m_hasTasks.Set(); | 77 // no sleeping workers are available |
78 // try create one | |
79 return StartWorker(); | |
80 else { | |
81 // we can get here a race condition when several threads asks to extend pool | |
82 // and some sleaping threads are exited due timeout but they are still counted as sleeping | |
83 // in that case all of this threads could exit except one | |
84 WakePool(); | |
70 return true; | 85 return true; |
71 } else | 86 } |
72 return StartWorker(); | 87 |
73 } | 88 } |
74 | 89 |
75 /// <summary> | 90 /// <summary> |
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | 91 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока |
77 /// </summary> | 92 /// </summary> |
78 protected void WakePool() { | 93 protected void WakePool() { |
79 m_hasTasks.Set(); // wake sleeping thread; | 94 m_hasTasks.Set(); // wake sleeping thread; |
80 | 95 |
81 if (AllocateThreadSlot(1)) { | 96 if (AllocateThreadSlot(1)) { |
97 // if there were no threads in the pool | |
82 var worker = new Thread(this.Worker); | 98 var worker = new Thread(this.Worker); |
83 worker.IsBackground = true; | 99 worker.IsBackground = true; |
84 worker.Start(); | 100 worker.Start(); |
85 } | 101 } |
86 } | 102 } |
87 | 103 |
88 protected virtual void Suspend() { | 104 bool Sleep(int timeout) { |
89 m_hasTasks.WaitOne(); | 105 Interlocked.Increment(ref m_sleepingThreads); |
106 var result = m_hasTasks.WaitOne(timeout); | |
107 Interlocked.Decrement(ref m_sleepingThreads); | |
108 return result; | |
109 } | |
110 | |
111 protected virtual bool Suspend() { | |
112 //no tasks left, exit if the thread is no longer needed | |
113 bool last; | |
114 bool requestExit; | |
115 | |
116 if (m_releaseTimeout > 0) | |
117 requestExit = !Sleep(m_releaseTimeout); | |
118 else | |
119 requestExit = true; | |
120 | |
121 | |
122 if (requestExit && ReleaseThreadSlot(out last)) { | |
123 // in case at the moment the last thread was being released | |
124 // a new task was added to the queue, we need to try | |
125 // to revoke the thread to avoid the situation when the task is left unprocessed | |
126 if (last && m_hasTasks.WaitOne(0)) { | |
127 if (AllocateThreadSlot(1)) | |
128 return true; // spin again... | |
129 else | |
130 // we failed to reallocate the first slot for this thread | |
131 // therefore we need to release the event | |
132 m_hasTasks.Set(); | |
133 } | |
134 | |
135 return false; | |
136 } | |
137 | |
138 Sleep(-1); | |
139 | |
140 return true; | |
90 } | 141 } |
91 | 142 |
92 #region thread slots traits | 143 #region thread slots traits |
93 | 144 |
94 bool AllocateThreadSlot() { | 145 bool AllocateThreadSlot() { |
95 int current; | 146 int current; |
96 // use spins to allocate slot for the new thread | 147 // use spins to allocate slot for the new thread |
97 do { | 148 do { |
98 current = m_runningThreads; | 149 current = m_createdThreads; |
99 if (current >= m_maxThreads || m_exitRequired != 0) | 150 if (current >= m_maxThreads || m_exitRequired != 0) |
100 // no more slots left or the pool has been disposed | 151 // no more slots left or the pool has been disposed |
101 return false; | 152 return false; |
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | 153 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); |
103 | 154 |
104 UpdateMaxThreads(current + 1); | 155 UpdateMaxThreads(current + 1); |
105 | 156 |
106 return true; | 157 return true; |
107 } | 158 } |
108 | 159 |
109 bool AllocateThreadSlot(int desired) { | 160 bool AllocateThreadSlot(int desired) { |
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) | 161 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) |
111 return false; | 162 return false; |
112 | 163 |
113 UpdateMaxThreads(desired); | 164 UpdateMaxThreads(desired); |
114 | 165 |
115 return true; | 166 return true; |
118 bool ReleaseThreadSlot(out bool last) { | 169 bool ReleaseThreadSlot(out bool last) { |
119 last = false; | 170 last = false; |
120 int current; | 171 int current; |
121 // use spins to release slot for the new thread | 172 // use spins to release slot for the new thread |
122 do { | 173 do { |
123 current = m_runningThreads; | 174 current = m_createdThreads; |
124 if (current <= m_minThreads && m_exitRequired == 0) | 175 if (current <= m_minThreads && m_exitRequired == 0) |
125 // the thread is reserved | 176 // the thread is reserved |
126 return false; | 177 return false; |
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | 178 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); |
128 | 179 |
129 last = (current == 1); | 180 last = (current == 1); |
130 | 181 |
131 return true; | 182 return true; |
132 } | 183 } |
134 /// <summary> | 185 /// <summary> |
135 /// releases thread slot unconditionally, used during cleanup | 186 /// releases thread slot unconditionally, used during cleanup |
136 /// </summary> | 187 /// </summary> |
137 /// <returns>true - no more threads left</returns> | 188 /// <returns>true - no more threads left</returns> |
138 bool ReleaseThreadSlotAnyway() { | 189 bool ReleaseThreadSlotAnyway() { |
139 var left = Interlocked.Decrement(ref m_runningThreads); | 190 var left = Interlocked.Decrement(ref m_createdThreads); |
140 return left == 0; | 191 return left == 0; |
141 } | 192 } |
142 | 193 |
143 void UpdateMaxThreads(int count) { | 194 void UpdateMaxThreads(int count) { |
144 int max; | 195 int max; |
167 bool FetchTask(out TUnit unit) { | 218 bool FetchTask(out TUnit unit) { |
168 do { | 219 do { |
169 // exit if requested | 220 // exit if requested |
170 if (m_exitRequired != 0) { | 221 if (m_exitRequired != 0) { |
171 // release the thread slot | 222 // release the thread slot |
223 Interlocked.Decrement(ref m_activeThreads); | |
172 if (ReleaseThreadSlotAnyway()) // it was the last worker | 224 if (ReleaseThreadSlotAnyway()) // it was the last worker |
173 m_hasTasks.Dispose(); | 225 m_hasTasks.Dispose(); |
174 else | 226 else |
175 m_hasTasks.Set(); // wake next worker | 227 m_hasTasks.Set(); // wake next worker |
176 unit = default(TUnit); | 228 unit = default(TUnit); |
181 if (TryDequeue(out unit)) { | 233 if (TryDequeue(out unit)) { |
182 ExtendPool(); | 234 ExtendPool(); |
183 return true; | 235 return true; |
184 } | 236 } |
185 | 237 |
186 //no tasks left, exit if the thread is no longer needed | 238 Interlocked.Decrement(ref m_activeThreads); |
187 bool last; | |
188 if (ReleaseThreadSlot(out last)) { | |
189 if (last && m_hasTasks.WaitOne(0)) { | |
190 if (AllocateThreadSlot(1)) | |
191 continue; // spin again... | |
192 else | |
193 // we failed to reallocate slot for this thread | |
194 // therefore we need to release the event | |
195 m_hasTasks.Set(); | |
196 } | |
197 | |
198 return false; | |
199 } | |
200 | 239 |
201 // entering suspend state | 240 // entering suspend state |
202 Interlocked.Increment(ref m_suspended); | |
203 // keep this thread and wait | 241 // keep this thread and wait |
204 Suspend(); | 242 if (!Suspend()) |
205 Interlocked.Decrement(ref m_suspended); | 243 return false; |
244 | |
245 Interlocked.Increment(ref m_activeThreads); | |
206 } while (true); | 246 } while (true); |
207 } | 247 } |
208 | 248 |
209 protected abstract void InvokeUnit(TUnit unit); | 249 protected abstract void InvokeUnit(TUnit unit); |
210 | 250 |
211 void Worker() { | 251 void Worker() { |
212 TUnit unit; | 252 TUnit unit; |
253 Interlocked.Increment(ref m_activeThreads); | |
213 while (FetchTask(out unit)) | 254 while (FetchTask(out unit)) |
214 InvokeUnit(unit); | 255 InvokeUnit(unit); |
215 } | 256 } |
216 | 257 |
217 protected virtual void Dispose(bool disposing) { | 258 protected virtual void Dispose(bool disposing) { |