Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 101:279e226dffdd v2
code cleanup
added EnsureDispatched extension
author | cin |
---|---|
date | Thu, 06 Nov 2014 20:03:19 +0300 |
parents | dc4942d09e74 |
children | f803565868a4 |
rev | line source |
---|---|
15 | 1 using System; |
2 using System.Threading; | |
3 | |
4 namespace Implab.Parallels { | |
5 public abstract class DispatchPool<TUnit> : IDisposable { | |
81 | 6 readonly int m_minThreadsLimit; |
7 readonly int m_maxThreadsLimit; | |
8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
9 |
93 | 10 int m_threads; // the current size of the pool |
11 int m_maxRunningThreads; // the meximum reached size of the pool | |
12 int m_exit; // the pool is going to shutdown, all unused workers are released | |
80 | 13 |
81 | 14 readonly object m_signal = new object(); // used to pulse waiting threads |
15 | 15 |
16 protected DispatchPool(int min, int max) { | |
17 if (min < 0) | |
18 throw new ArgumentOutOfRangeException("min"); | |
19 if (max <= 0) | |
20 throw new ArgumentOutOfRangeException("max"); | |
21 | |
22 if (min > max) | |
23 min = max; | |
81 | 24 m_minThreadsLimit = min; |
25 m_maxThreadsLimit = max; | |
15 | 26 } |
27 | |
28 protected DispatchPool(int threads) | |
29 : this(threads, threads) { | |
30 } | |
31 | |
32 protected DispatchPool() { | |
33 int maxThreads, maxCP; | |
34 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
35 | |
81 | 36 m_minThreadsLimit = 0; |
37 m_maxThreadsLimit = maxThreads; | |
15 | 38 } |
39 | |
40 protected void InitPool() { | |
81 | 41 for (int i = 0; i < m_minThreadsLimit; i++) |
15 | 42 StartWorker(); |
43 } | |
44 | |
20 | 45 public int PoolSize { |
15 | 46 get { |
80 | 47 Thread.MemoryBarrier(); |
81 | 48 return m_threads; |
20 | 49 } |
50 } | |
81 | 51 |
15 | 52 public int MaxRunningThreads { |
53 get { | |
80 | 54 Thread.MemoryBarrier(); |
15 | 55 return m_maxRunningThreads; |
56 } | |
57 } | |
58 | |
59 protected bool IsDisposed { | |
60 get { | |
80 | 61 Thread.MemoryBarrier(); |
81 | 62 return m_exit == 1; |
15 | 63 } |
64 } | |
65 | |
17 | 66 protected abstract bool TryDequeue(out TUnit unit); |
67 | |
89 | 68 bool Dequeue(out TUnit unit, int timeout) { |
81 | 69 int ts = Environment.TickCount; |
70 if (TryDequeue(out unit)) | |
71 return true; | |
72 lock (m_signal) { | |
73 while (!TryDequeue(out unit) && m_exit == 0) | |
74 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | |
75 // timeout | |
76 return false; | |
80 | 77 } |
81 | 78 // queue item or terminate |
79 Monitor.Pulse(m_signal); | |
80 if (m_exit == 1) | |
81 return false; | |
80 | 82 } |
81 | 83 return true; |
22 | 84 } |
85 | |
81 | 86 protected void SignalThread() { |
87 lock (m_signal) { | |
88 Monitor.Pulse(m_signal); | |
30 | 89 } |
90 } | |
91 | |
17 | 92 #region thread slots traits |
93 | |
94 bool AllocateThreadSlot() { | |
16 | 95 int current; |
15 | 96 // use spins to allocate slot for the new thread |
97 do { | |
81 | 98 current = m_threads; |
99 if (current >= m_maxThreadsLimit || m_exit == 1) | |
15 | 100 // no more slots left or the pool has been disposed |
101 return false; | |
81 | 102 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); |
15 | 103 |
17 | 104 UpdateMaxThreads(current + 1); |
105 | |
106 return true; | |
107 } | |
15 | 108 |
17 | 109 bool AllocateThreadSlot(int desired) { |
81 | 110 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) |
17 | 111 return false; |
112 | |
113 UpdateMaxThreads(desired); | |
15 | 114 |
17 | 115 return true; |
116 } | |
117 | |
118 bool ReleaseThreadSlot(out bool last) { | |
119 last = false; | |
120 int current; | |
121 // use spins to release slot for the new thread | |
80 | 122 Thread.MemoryBarrier(); |
17 | 123 do { |
81 | 124 current = m_threads; |
125 if (current <= m_minThreadsLimit && m_exit == 0) | |
17 | 126 // the thread is reserved |
127 return false; | |
81 | 128 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); |
17 | 129 |
130 last = (current == 1); | |
15 | 131 |
132 return true; | |
133 } | |
134 | |
17 | 135 void UpdateMaxThreads(int count) { |
136 int max; | |
16 | 137 do { |
17 | 138 max = m_maxRunningThreads; |
139 if (max >= count) | |
140 break; | |
141 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
16 | 142 } |
143 | |
17 | 144 #endregion |
145 | |
81 | 146 protected bool StartWorker() { |
17 | 147 if (AllocateThreadSlot()) { |
148 // slot successfully allocated | |
92 | 149 var worker = new Thread(Worker); |
17 | 150 worker.IsBackground = true; |
151 worker.Start(); | |
152 | |
153 return true; | |
154 } | |
92 | 155 return false; |
16 | 156 } |
157 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
158 protected abstract void InvokeUnit(TUnit unit); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
159 |
41 | 160 protected virtual void Worker() { |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
161 TUnit unit; |
81 | 162 bool last; |
15 | 163 do { |
81 | 164 while (Dequeue(out unit, m_releaseTimeout)) { |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
165 InvokeUnit(unit); |
81 | 166 } |
167 if(!ReleaseThreadSlot(out last)) | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
168 continue; |
81 | 169 // queue may be not empty |
170 if (last && TryDequeue(out unit)) { | |
171 InvokeUnit(unit); | |
172 if (AllocateThreadSlot(1)) | |
173 continue; | |
174 // we can safely exit since pool is alive | |
15 | 175 } |
81 | 176 break; |
177 } while(true); | |
178 } | |
15 | 179 |
180 | |
181 protected virtual void Dispose(bool disposing) { | |
182 if (disposing) { | |
81 | 183 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier |
15 | 184 // wake sleeping threads |
81 | 185 SignalThread(); |
15 | 186 GC.SuppressFinalize(this); |
187 } | |
188 } | |
189 } | |
190 | |
191 public void Dispose() { | |
192 Dispose(true); | |
193 } | |
194 | |
195 ~DispatchPool() { | |
196 Dispose(false); | |
197 } | |
198 } | |
199 } |