Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 18:0c924dff5498
Слияние с promises
author | cin |
---|---|
date | Fri, 08 Nov 2013 01:27:04 +0400 |
parents | 7cd4a843b4e4 |
children | 1c3b3d518480 |
comparison
equal
deleted
inserted
replaced
6:dfa21d507bc5 | 18:0c924dff5498 |
---|---|
1 using System; | |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
9 public abstract class DispatchPool<TUnit> : IDisposable { | |
10 readonly int m_minThreads; | |
11 readonly int m_maxThreads; | |
12 int m_runningThreads = 0; | |
13 int m_maxRunningThreads = 0; | |
14 int m_suspended = 0; | |
15 int m_exitRequired = 0; | |
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false); | |
17 | |
18 protected DispatchPool(int min, int max) { | |
19 if (min < 0) | |
20 throw new ArgumentOutOfRangeException("min"); | |
21 if (max <= 0) | |
22 throw new ArgumentOutOfRangeException("max"); | |
23 | |
24 if (min > max) | |
25 min = max; | |
26 m_minThreads = min; | |
27 m_maxThreads = max; | |
28 } | |
29 | |
30 protected DispatchPool(int threads) | |
31 : this(threads, threads) { | |
32 } | |
33 | |
34 protected DispatchPool() { | |
35 int maxThreads, maxCP; | |
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
37 | |
38 m_minThreads = 0; | |
39 m_maxThreads = maxThreads; | |
40 } | |
41 | |
42 protected void InitPool() { | |
43 for (int i = 0; i < m_minThreads; i++) | |
44 StartWorker(); | |
45 } | |
46 | |
47 public int ThreadCount { | |
48 get { | |
49 return m_runningThreads; | |
50 } | |
51 } | |
52 | |
53 public int MaxRunningThreads { | |
54 get { | |
55 return m_maxRunningThreads; | |
56 } | |
57 } | |
58 | |
59 protected bool IsDisposed { | |
60 get { | |
61 return m_exitRequired != 0; | |
62 } | |
63 } | |
64 | |
65 protected abstract bool TryDequeue(out TUnit unit); | |
66 | |
67 protected virtual bool ExtendPool() { | |
68 if (m_suspended > 0) { | |
69 m_hasTasks.Set(); | |
70 return true; | |
71 } else | |
72 return StartWorker(); | |
73 } | |
74 | |
75 /// <summary> | |
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |
77 /// </summary> | |
78 protected void WakePool() { | |
79 m_hasTasks.Set(); // wake sleeping thread; | |
80 | |
81 if (AllocateThreadSlot(1)) { | |
82 var worker = new Thread(this.Worker); | |
83 worker.IsBackground = true; | |
84 worker.Start(); | |
85 } | |
86 } | |
87 | |
88 protected virtual void Suspend() { | |
89 m_hasTasks.WaitOne(); | |
90 } | |
91 | |
92 #region thread slots traits | |
93 | |
94 bool AllocateThreadSlot() { | |
95 int current; | |
96 // use spins to allocate slot for the new thread | |
97 do { | |
98 current = m_runningThreads; | |
99 if (current >= m_maxThreads || m_exitRequired != 0) | |
100 // no more slots left or the pool has been disposed | |
101 return false; | |
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | |
103 | |
104 UpdateMaxThreads(current + 1); | |
105 | |
106 return true; | |
107 } | |
108 | |
109 bool AllocateThreadSlot(int desired) { | |
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) | |
111 return false; | |
112 | |
113 UpdateMaxThreads(desired); | |
114 | |
115 return true; | |
116 } | |
117 | |
118 bool ReleaseThreadSlot(out bool last) { | |
119 last = false; | |
120 int current; | |
121 // use spins to release slot for the new thread | |
122 do { | |
123 current = m_runningThreads; | |
124 if (current <= m_minThreads && m_exitRequired == 0) | |
125 // the thread is reserved | |
126 return false; | |
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | |
128 | |
129 last = (current == 1); | |
130 | |
131 return true; | |
132 } | |
133 | |
134 /// <summary> | |
135 /// releases thread slot unconditionally, used during cleanup | |
136 /// </summary> | |
137 /// <returns>true - no more threads left</returns> | |
138 bool ReleaseThreadSlotAnyway() { | |
139 var left = Interlocked.Decrement(ref m_runningThreads); | |
140 return left == 0; | |
141 } | |
142 | |
143 void UpdateMaxThreads(int count) { | |
144 int max; | |
145 do { | |
146 max = m_maxRunningThreads; | |
147 if (max >= count) | |
148 break; | |
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
150 } | |
151 | |
152 #endregion | |
153 | |
154 bool StartWorker() { | |
155 if (AllocateThreadSlot()) { | |
156 // slot successfully allocated | |
157 var worker = new Thread(this.Worker); | |
158 worker.IsBackground = true; | |
159 worker.Start(); | |
160 | |
161 return true; | |
162 } else { | |
163 return false; | |
164 } | |
165 } | |
166 | |
167 bool FetchTask(out TUnit unit) { | |
168 do { | |
169 // exit if requested | |
170 if (m_exitRequired != 0) { | |
171 // release the thread slot | |
172 if (ReleaseThreadSlotAnyway()) // it was the last worker | |
173 m_hasTasks.Dispose(); | |
174 else | |
175 m_hasTasks.Set(); // wake next worker | |
176 unit = default(TUnit); | |
177 return false; | |
178 } | |
179 | |
180 // fetch task | |
181 if (TryDequeue(out unit)) { | |
182 ExtendPool(); | |
183 return true; | |
184 } | |
185 | |
186 //no tasks left, exit if the thread is no longer needed | |
187 bool last; | |
188 if (ReleaseThreadSlot(out last)) { | |
189 if (last && m_hasTasks.WaitOne(0)) { | |
190 if (AllocateThreadSlot(1)) | |
191 continue; // spin again... | |
192 else | |
193 // we failed to reallocate slot for this thread | |
194 // therefore we need to release the event | |
195 m_hasTasks.Set(); | |
196 } | |
197 | |
198 return false; | |
199 } | |
200 | |
201 // entering suspend state | |
202 Interlocked.Increment(ref m_suspended); | |
203 // keep this thread and wait | |
204 Suspend(); | |
205 Interlocked.Decrement(ref m_suspended); | |
206 } while (true); | |
207 } | |
208 | |
209 protected abstract void InvokeUnit(TUnit unit); | |
210 | |
211 void Worker() { | |
212 TUnit unit; | |
213 while (FetchTask(out unit)) | |
214 InvokeUnit(unit); | |
215 } | |
216 | |
217 protected virtual void Dispose(bool disposing) { | |
218 if (disposing) { | |
219 if (m_exitRequired == 0) { | |
220 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | |
221 return; | |
222 | |
223 // wake sleeping threads | |
224 m_hasTasks.Set(); | |
225 GC.SuppressFinalize(this); | |
226 } | |
227 } | |
228 } | |
229 | |
230 public void Dispose() { | |
231 Dispose(true); | |
232 } | |
233 | |
234 ~DispatchPool() { | |
235 Dispose(false); | |
236 } | |
237 } | |
238 } |