comparison Implab/Parallels/DispatchPool.cs @ 18:0c924dff5498

Слияние с promises
author cin
date Fri, 08 Nov 2013 01:27:04 +0400
parents 7cd4a843b4e4
children 1c3b3d518480
comparison
equal deleted inserted replaced
6:dfa21d507bc5 18:0c924dff5498
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
18 protected DispatchPool(int min, int max) {
19 if (min < 0)
20 throw new ArgumentOutOfRangeException("min");
21 if (max <= 0)
22 throw new ArgumentOutOfRangeException("max");
23
24 if (min > max)
25 min = max;
26 m_minThreads = min;
27 m_maxThreads = max;
28 }
29
30 protected DispatchPool(int threads)
31 : this(threads, threads) {
32 }
33
34 protected DispatchPool() {
35 int maxThreads, maxCP;
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37
38 m_minThreads = 0;
39 m_maxThreads = maxThreads;
40 }
41
42 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++)
44 StartWorker();
45 }
46
47 public int ThreadCount {
48 get {
49 return m_runningThreads;
50 }
51 }
52
53 public int MaxRunningThreads {
54 get {
55 return m_maxRunningThreads;
56 }
57 }
58
59 protected bool IsDisposed {
60 get {
61 return m_exitRequired != 0;
62 }
63 }
64
65 protected abstract bool TryDequeue(out TUnit unit);
66
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
69 m_hasTasks.Set();
70 return true;
71 } else
72 return StartWorker();
73 }
74
75 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
80
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
85 }
86 }
87
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
90 }
91
92 #region thread slots traits
93
94 bool AllocateThreadSlot() {
95 int current;
96 // use spins to allocate slot for the new thread
97 do {
98 current = m_runningThreads;
99 if (current >= m_maxThreads || m_exitRequired != 0)
100 // no more slots left or the pool has been disposed
101 return false;
102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
103
104 UpdateMaxThreads(current + 1);
105
106 return true;
107 }
108
109 bool AllocateThreadSlot(int desired) {
110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
111 return false;
112
113 UpdateMaxThreads(desired);
114
115 return true;
116 }
117
118 bool ReleaseThreadSlot(out bool last) {
119 last = false;
120 int current;
121 // use spins to release slot for the new thread
122 do {
123 current = m_runningThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
126 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
128
129 last = (current == 1);
130
131 return true;
132 }
133
134 /// <summary>
135 /// releases thread slot unconditionally, used during cleanup
136 /// </summary>
137 /// <returns>true - no more threads left</returns>
138 bool ReleaseThreadSlotAnyway() {
139 var left = Interlocked.Decrement(ref m_runningThreads);
140 return left == 0;
141 }
142
143 void UpdateMaxThreads(int count) {
144 int max;
145 do {
146 max = m_maxRunningThreads;
147 if (max >= count)
148 break;
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 }
151
152 #endregion
153
154 bool StartWorker() {
155 if (AllocateThreadSlot()) {
156 // slot successfully allocated
157 var worker = new Thread(this.Worker);
158 worker.IsBackground = true;
159 worker.Start();
160
161 return true;
162 } else {
163 return false;
164 }
165 }
166
167 bool FetchTask(out TUnit unit) {
168 do {
169 // exit if requested
170 if (m_exitRequired != 0) {
171 // release the thread slot
172 if (ReleaseThreadSlotAnyway()) // it was the last worker
173 m_hasTasks.Dispose();
174 else
175 m_hasTasks.Set(); // wake next worker
176 unit = default(TUnit);
177 return false;
178 }
179
180 // fetch task
181 if (TryDequeue(out unit)) {
182 ExtendPool();
183 return true;
184 }
185
186 //no tasks left, exit if the thread is no longer needed
187 bool last;
188 if (ReleaseThreadSlot(out last)) {
189 if (last && m_hasTasks.WaitOne(0)) {
190 if (AllocateThreadSlot(1))
191 continue; // spin again...
192 else
193 // we failed to reallocate slot for this thread
194 // therefore we need to release the event
195 m_hasTasks.Set();
196 }
197
198 return false;
199 }
200
201 // entering suspend state
202 Interlocked.Increment(ref m_suspended);
203 // keep this thread and wait
204 Suspend();
205 Interlocked.Decrement(ref m_suspended);
206 } while (true);
207 }
208
209 protected abstract void InvokeUnit(TUnit unit);
210
211 void Worker() {
212 TUnit unit;
213 while (FetchTask(out unit))
214 InvokeUnit(unit);
215 }
216
217 protected virtual void Dispose(bool disposing) {
218 if (disposing) {
219 if (m_exitRequired == 0) {
220 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
221 return;
222
223 // wake sleeping threads
224 m_hasTasks.Set();
225 GC.SuppressFinalize(this);
226 }
227 }
228 }
229
230 public void Dispose() {
231 Dispose(true);
232 }
233
234 ~DispatchPool() {
235 Dispose(false);
236 }
237 }
238 }