Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 196:40d7fed4a09e
fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author | cin |
---|---|
date | Mon, 29 Aug 2016 23:15:51 +0300 |
parents | f803565868a4 |
children |
rev | line source |
---|---|
15 | 1 using System; |
2 using System.Threading; | |
3 | |
4 namespace Implab.Parallels { | |
5 public abstract class DispatchPool<TUnit> : IDisposable { | |
81 | 6 readonly int m_minThreadsLimit; |
7 readonly int m_maxThreadsLimit; | |
8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
9 |
93 | 10 int m_threads; // the current size of the pool |
11 int m_maxRunningThreads; // the meximum reached size of the pool | |
12 int m_exit; // the pool is going to shutdown, all unused workers are released | |
80 | 13 |
81 | 14 readonly object m_signal = new object(); // used to pulse waiting threads |
15 | 15 |
16 protected DispatchPool(int min, int max) { | |
17 if (min < 0) | |
18 throw new ArgumentOutOfRangeException("min"); | |
19 if (max <= 0) | |
20 throw new ArgumentOutOfRangeException("max"); | |
21 | |
22 if (min > max) | |
23 min = max; | |
81 | 24 m_minThreadsLimit = min; |
25 m_maxThreadsLimit = max; | |
15 | 26 } |
27 | |
28 protected DispatchPool(int threads) | |
29 : this(threads, threads) { | |
30 } | |
31 | |
32 protected DispatchPool() { | |
33 | |
81 | 34 m_minThreadsLimit = 0; |
125 | 35 m_maxThreadsLimit = Environment.ProcessorCount; |
15 | 36 } |
37 | |
38 protected void InitPool() { | |
81 | 39 for (int i = 0; i < m_minThreadsLimit; i++) |
15 | 40 StartWorker(); |
41 } | |
42 | |
20 | 43 public int PoolSize { |
15 | 44 get { |
80 | 45 Thread.MemoryBarrier(); |
81 | 46 return m_threads; |
20 | 47 } |
48 } | |
81 | 49 |
15 | 50 public int MaxRunningThreads { |
51 get { | |
80 | 52 Thread.MemoryBarrier(); |
15 | 53 return m_maxRunningThreads; |
54 } | |
55 } | |
56 | |
57 protected bool IsDisposed { | |
58 get { | |
80 | 59 Thread.MemoryBarrier(); |
81 | 60 return m_exit == 1; |
15 | 61 } |
62 } | |
63 | |
17 | 64 protected abstract bool TryDequeue(out TUnit unit); |
65 | |
89 | 66 bool Dequeue(out TUnit unit, int timeout) { |
81 | 67 int ts = Environment.TickCount; |
68 if (TryDequeue(out unit)) | |
69 return true; | |
70 lock (m_signal) { | |
71 while (!TryDequeue(out unit) && m_exit == 0) | |
72 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | |
73 // timeout | |
74 return false; | |
80 | 75 } |
81 | 76 // queue item or terminate |
77 Monitor.Pulse(m_signal); | |
78 if (m_exit == 1) | |
79 return false; | |
80 | 80 } |
81 | 81 return true; |
22 | 82 } |
83 | |
81 | 84 protected void SignalThread() { |
85 lock (m_signal) { | |
86 Monitor.Pulse(m_signal); | |
30 | 87 } |
88 } | |
89 | |
17 | 90 #region thread slots traits |
91 | |
92 bool AllocateThreadSlot() { | |
16 | 93 int current; |
15 | 94 // use spins to allocate slot for the new thread |
95 do { | |
81 | 96 current = m_threads; |
97 if (current >= m_maxThreadsLimit || m_exit == 1) | |
15 | 98 // no more slots left or the pool has been disposed |
99 return false; | |
81 | 100 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); |
15 | 101 |
17 | 102 UpdateMaxThreads(current + 1); |
103 | |
104 return true; | |
105 } | |
15 | 106 |
17 | 107 bool AllocateThreadSlot(int desired) { |
81 | 108 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) |
17 | 109 return false; |
110 | |
111 UpdateMaxThreads(desired); | |
15 | 112 |
17 | 113 return true; |
114 } | |
115 | |
116 bool ReleaseThreadSlot(out bool last) { | |
117 last = false; | |
118 int current; | |
119 // use spins to release slot for the new thread | |
80 | 120 Thread.MemoryBarrier(); |
17 | 121 do { |
81 | 122 current = m_threads; |
123 if (current <= m_minThreadsLimit && m_exit == 0) | |
17 | 124 // the thread is reserved |
125 return false; | |
81 | 126 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); |
17 | 127 |
128 last = (current == 1); | |
15 | 129 |
130 return true; | |
131 } | |
132 | |
17 | 133 void UpdateMaxThreads(int count) { |
134 int max; | |
16 | 135 do { |
17 | 136 max = m_maxRunningThreads; |
137 if (max >= count) | |
138 break; | |
139 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
16 | 140 } |
141 | |
17 | 142 #endregion |
143 | |
81 | 144 protected bool StartWorker() { |
17 | 145 if (AllocateThreadSlot()) { |
146 // slot successfully allocated | |
92 | 147 var worker = new Thread(Worker); |
17 | 148 worker.IsBackground = true; |
149 worker.Start(); | |
150 | |
151 return true; | |
152 } | |
92 | 153 return false; |
16 | 154 } |
155 | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
156 protected abstract void InvokeUnit(TUnit unit); |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
157 |
41 | 158 protected virtual void Worker() { |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
159 TUnit unit; |
81 | 160 bool last; |
15 | 161 do { |
81 | 162 while (Dequeue(out unit, m_releaseTimeout)) { |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
163 InvokeUnit(unit); |
81 | 164 } |
165 if(!ReleaseThreadSlot(out last)) | |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
166 continue; |
81 | 167 // queue may be not empty |
168 if (last && TryDequeue(out unit)) { | |
169 InvokeUnit(unit); | |
170 if (AllocateThreadSlot(1)) | |
171 continue; | |
172 // we can safely exit since pool is alive | |
15 | 173 } |
81 | 174 break; |
175 } while(true); | |
176 } | |
15 | 177 |
178 | |
179 protected virtual void Dispose(bool disposing) { | |
180 if (disposing) { | |
81 | 181 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier |
15 | 182 // wake sleeping threads |
81 | 183 SignalThread(); |
15 | 184 GC.SuppressFinalize(this); |
185 } | |
186 } | |
187 } | |
188 | |
189 public void Dispose() { | |
190 Dispose(true); | |
191 } | |
192 | |
193 ~DispatchPool() { | |
194 Dispose(false); | |
195 } | |
196 } | |
197 } |