comparison Implab/Parallels/DispatchPool.cs @ 20:1c3b3d518480 promises

refactoring, sync
author cin
date Tue, 12 Nov 2013 02:27:22 +0400
parents 7cd4a843b4e4
children 6a56df4ec59e
comparison
equal deleted inserted replaced
19:e3935fdf59a2 20:1c3b3d518480
7 7
8 namespace Implab.Parallels { 8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable { 9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads; 10 readonly int m_minThreads;
11 readonly int m_maxThreads; 11 readonly int m_maxThreads;
12 int m_runningThreads = 0; 12 int m_createdThreads = 0;
13 int m_activeThreads = 0;
14 int m_sleepingThreads = 0;
13 int m_maxRunningThreads = 0; 15 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0; 16 int m_exitRequired = 0;
17 int m_releaseTimeout = 100; // timeout while the working thread will wait for the new tasks before exit
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false); 18 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17 19
18 protected DispatchPool(int min, int max) { 20 protected DispatchPool(int min, int max) {
19 if (min < 0) 21 if (min < 0)
20 throw new ArgumentOutOfRangeException("min"); 22 throw new ArgumentOutOfRangeException("min");
42 protected void InitPool() { 44 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++) 45 for (int i = 0; i < m_minThreads; i++)
44 StartWorker(); 46 StartWorker();
45 } 47 }
46 48
47 public int ThreadCount { 49 public int PoolSize {
48 get { 50 get {
49 return m_runningThreads; 51 return m_createdThreads;
52 }
53 }
54
55 public int ActiveThreads {
56 get {
57 return m_activeThreads;
50 } 58 }
51 } 59 }
52 60
53 public int MaxRunningThreads { 61 public int MaxRunningThreads {
54 get { 62 get {
63 } 71 }
64 72
65 protected abstract bool TryDequeue(out TUnit unit); 73 protected abstract bool TryDequeue(out TUnit unit);
66 74
67 protected virtual bool ExtendPool() { 75 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) { 76 if (m_sleepingThreads == 0)
69 m_hasTasks.Set(); 77 // no sleeping workers are available
78 // try create one
79 return StartWorker();
80 else {
81 // we can get here a race condition when several threads asks to extend pool
82 // and some sleaping threads are exited due timeout but they are still counted as sleeping
83 // in that case all of this threads could exit except one
84 WakePool();
70 return true; 85 return true;
71 } else 86 }
72 return StartWorker(); 87
73 } 88 }
74 89
75 /// <summary> 90 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока 91 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary> 92 /// </summary>
78 protected void WakePool() { 93 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread; 94 m_hasTasks.Set(); // wake sleeping thread;
80 95
81 if (AllocateThreadSlot(1)) { 96 if (AllocateThreadSlot(1)) {
97 // if there were no threads in the pool
82 var worker = new Thread(this.Worker); 98 var worker = new Thread(this.Worker);
83 worker.IsBackground = true; 99 worker.IsBackground = true;
84 worker.Start(); 100 worker.Start();
85 } 101 }
86 } 102 }
87 103
88 protected virtual void Suspend() { 104 bool Sleep(int timeout) {
89 m_hasTasks.WaitOne(); 105 Interlocked.Increment(ref m_sleepingThreads);
106 var result = m_hasTasks.WaitOne(timeout);
107 Interlocked.Decrement(ref m_sleepingThreads);
108 return result;
109 }
110
111 protected virtual bool Suspend() {
112 //no tasks left, exit if the thread is no longer needed
113 bool last;
114 bool requestExit;
115
116 if (m_releaseTimeout > 0)
117 requestExit = !Sleep(m_releaseTimeout);
118 else
119 requestExit = true;
120
121
122 if (requestExit && ReleaseThreadSlot(out last)) {
123 // in case at the moment the last thread was being released
124 // a new task was added to the queue, we need to try
125 // to revoke the thread to avoid the situation when the task is left unprocessed
126 if (last && m_hasTasks.WaitOne(0)) {
127 if (AllocateThreadSlot(1))
128 return true; // spin again...
129 else
130 // we failed to reallocate the first slot for this thread
131 // therefore we need to release the event
132 m_hasTasks.Set();
133 }
134
135 return false;
136 }
137
138 Sleep(-1);
139
140 return true;
90 } 141 }
91 142
92 #region thread slots traits 143 #region thread slots traits
93 144
94 bool AllocateThreadSlot() { 145 bool AllocateThreadSlot() {
95 int current; 146 int current;
96 // use spins to allocate slot for the new thread 147 // use spins to allocate slot for the new thread
97 do { 148 do {
98 current = m_runningThreads; 149 current = m_createdThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0) 150 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed 151 // no more slots left or the pool has been disposed
101 return false; 152 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); 153 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
103 154
104 UpdateMaxThreads(current + 1); 155 UpdateMaxThreads(current + 1);
105 156
106 return true; 157 return true;
107 } 158 }
108 159
109 bool AllocateThreadSlot(int desired) { 160 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) 161 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
111 return false; 162 return false;
112 163
113 UpdateMaxThreads(desired); 164 UpdateMaxThreads(desired);
114 165
115 return true; 166 return true;
118 bool ReleaseThreadSlot(out bool last) { 169 bool ReleaseThreadSlot(out bool last) {
119 last = false; 170 last = false;
120 int current; 171 int current;
121 // use spins to release slot for the new thread 172 // use spins to release slot for the new thread
122 do { 173 do {
123 current = m_runningThreads; 174 current = m_createdThreads;
124 if (current <= m_minThreads && m_exitRequired == 0) 175 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved 176 // the thread is reserved
126 return false; 177 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); 178 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
128 179
129 last = (current == 1); 180 last = (current == 1);
130 181
131 return true; 182 return true;
132 } 183 }
134 /// <summary> 185 /// <summary>
135 /// releases thread slot unconditionally, used during cleanup 186 /// releases thread slot unconditionally, used during cleanup
136 /// </summary> 187 /// </summary>
137 /// <returns>true - no more threads left</returns> 188 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() { 189 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads); 190 var left = Interlocked.Decrement(ref m_createdThreads);
140 return left == 0; 191 return left == 0;
141 } 192 }
142 193
143 void UpdateMaxThreads(int count) { 194 void UpdateMaxThreads(int count) {
144 int max; 195 int max;
167 bool FetchTask(out TUnit unit) { 218 bool FetchTask(out TUnit unit) {
168 do { 219 do {
169 // exit if requested 220 // exit if requested
170 if (m_exitRequired != 0) { 221 if (m_exitRequired != 0) {
171 // release the thread slot 222 // release the thread slot
223 Interlocked.Decrement(ref m_activeThreads);
172 if (ReleaseThreadSlotAnyway()) // it was the last worker 224 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose(); 225 m_hasTasks.Dispose();
174 else 226 else
175 m_hasTasks.Set(); // wake next worker 227 m_hasTasks.Set(); // wake next worker
176 unit = default(TUnit); 228 unit = default(TUnit);
181 if (TryDequeue(out unit)) { 233 if (TryDequeue(out unit)) {
182 ExtendPool(); 234 ExtendPool();
183 return true; 235 return true;
184 } 236 }
185 237
186 //no tasks left, exit if the thread is no longer needed 238 Interlocked.Decrement(ref m_activeThreads);
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200 239
201 // entering suspend state 240 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait 241 // keep this thread and wait
204 Suspend(); 242 if (!Suspend())
205 Interlocked.Decrement(ref m_suspended); 243 return false;
244
245 Interlocked.Increment(ref m_activeThreads);
206 } while (true); 246 } while (true);
207 } 247 }
208 248
209 protected abstract void InvokeUnit(TUnit unit); 249 protected abstract void InvokeUnit(TUnit unit);
210 250
211 void Worker() { 251 void Worker() {
212 TUnit unit; 252 TUnit unit;
253 Interlocked.Increment(ref m_activeThreads);
213 while (FetchTask(out unit)) 254 while (FetchTask(out unit))
214 InvokeUnit(unit); 255 InvokeUnit(unit);
215 } 256 }
216 257
217 protected virtual void Dispose(bool disposing) { 258 protected virtual void Dispose(bool disposing) {