Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 101:279e226dffdd v2
code cleanup
added EnsureDispatched extension
| author | cin |
|---|---|
| date | Thu, 06 Nov 2014 20:03:19 +0300 |
| parents | dc4942d09e74 |
| children | f803565868a4 |
| rev | line source |
|---|---|
| 15 | 1 using System; |
| 2 using System.Threading; | |
| 3 | |
| 4 namespace Implab.Parallels { | |
| 5 public abstract class DispatchPool<TUnit> : IDisposable { | |
| 81 | 6 readonly int m_minThreadsLimit; |
| 7 readonly int m_maxThreadsLimit; | |
| 8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
9 |
| 93 | 10 int m_threads; // the current size of the pool |
| 11 int m_maxRunningThreads; // the meximum reached size of the pool | |
| 12 int m_exit; // the pool is going to shutdown, all unused workers are released | |
| 80 | 13 |
| 81 | 14 readonly object m_signal = new object(); // used to pulse waiting threads |
| 15 | 15 |
| 16 protected DispatchPool(int min, int max) { | |
| 17 if (min < 0) | |
| 18 throw new ArgumentOutOfRangeException("min"); | |
| 19 if (max <= 0) | |
| 20 throw new ArgumentOutOfRangeException("max"); | |
| 21 | |
| 22 if (min > max) | |
| 23 min = max; | |
| 81 | 24 m_minThreadsLimit = min; |
| 25 m_maxThreadsLimit = max; | |
| 15 | 26 } |
| 27 | |
| 28 protected DispatchPool(int threads) | |
| 29 : this(threads, threads) { | |
| 30 } | |
| 31 | |
| 32 protected DispatchPool() { | |
| 33 int maxThreads, maxCP; | |
| 34 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
| 35 | |
| 81 | 36 m_minThreadsLimit = 0; |
| 37 m_maxThreadsLimit = maxThreads; | |
| 15 | 38 } |
| 39 | |
| 40 protected void InitPool() { | |
| 81 | 41 for (int i = 0; i < m_minThreadsLimit; i++) |
| 15 | 42 StartWorker(); |
| 43 } | |
| 44 | |
| 20 | 45 public int PoolSize { |
| 15 | 46 get { |
| 80 | 47 Thread.MemoryBarrier(); |
| 81 | 48 return m_threads; |
| 20 | 49 } |
| 50 } | |
| 81 | 51 |
| 15 | 52 public int MaxRunningThreads { |
| 53 get { | |
| 80 | 54 Thread.MemoryBarrier(); |
| 15 | 55 return m_maxRunningThreads; |
| 56 } | |
| 57 } | |
| 58 | |
| 59 protected bool IsDisposed { | |
| 60 get { | |
| 80 | 61 Thread.MemoryBarrier(); |
| 81 | 62 return m_exit == 1; |
| 15 | 63 } |
| 64 } | |
| 65 | |
| 17 | 66 protected abstract bool TryDequeue(out TUnit unit); |
| 67 | |
| 89 | 68 bool Dequeue(out TUnit unit, int timeout) { |
| 81 | 69 int ts = Environment.TickCount; |
| 70 if (TryDequeue(out unit)) | |
| 71 return true; | |
| 72 lock (m_signal) { | |
| 73 while (!TryDequeue(out unit) && m_exit == 0) | |
| 74 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | |
| 75 // timeout | |
| 76 return false; | |
| 80 | 77 } |
| 81 | 78 // queue item or terminate |
| 79 Monitor.Pulse(m_signal); | |
| 80 if (m_exit == 1) | |
| 81 return false; | |
| 80 | 82 } |
| 81 | 83 return true; |
| 22 | 84 } |
| 85 | |
| 81 | 86 protected void SignalThread() { |
| 87 lock (m_signal) { | |
| 88 Monitor.Pulse(m_signal); | |
| 30 | 89 } |
| 90 } | |
| 91 | |
| 17 | 92 #region thread slots traits |
| 93 | |
| 94 bool AllocateThreadSlot() { | |
| 16 | 95 int current; |
| 15 | 96 // use spins to allocate slot for the new thread |
| 97 do { | |
| 81 | 98 current = m_threads; |
| 99 if (current >= m_maxThreadsLimit || m_exit == 1) | |
| 15 | 100 // no more slots left or the pool has been disposed |
| 101 return false; | |
| 81 | 102 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); |
| 15 | 103 |
| 17 | 104 UpdateMaxThreads(current + 1); |
| 105 | |
| 106 return true; | |
| 107 } | |
| 15 | 108 |
| 17 | 109 bool AllocateThreadSlot(int desired) { |
| 81 | 110 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) |
| 17 | 111 return false; |
| 112 | |
| 113 UpdateMaxThreads(desired); | |
| 15 | 114 |
| 17 | 115 return true; |
| 116 } | |
| 117 | |
| 118 bool ReleaseThreadSlot(out bool last) { | |
| 119 last = false; | |
| 120 int current; | |
| 121 // use spins to release slot for the new thread | |
| 80 | 122 Thread.MemoryBarrier(); |
| 17 | 123 do { |
| 81 | 124 current = m_threads; |
| 125 if (current <= m_minThreadsLimit && m_exit == 0) | |
| 17 | 126 // the thread is reserved |
| 127 return false; | |
| 81 | 128 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); |
| 17 | 129 |
| 130 last = (current == 1); | |
| 15 | 131 |
| 132 return true; | |
| 133 } | |
| 134 | |
| 17 | 135 void UpdateMaxThreads(int count) { |
| 136 int max; | |
| 16 | 137 do { |
| 17 | 138 max = m_maxRunningThreads; |
| 139 if (max >= count) | |
| 140 break; | |
| 141 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
| 16 | 142 } |
| 143 | |
| 17 | 144 #endregion |
| 145 | |
| 81 | 146 protected bool StartWorker() { |
| 17 | 147 if (AllocateThreadSlot()) { |
| 148 // slot successfully allocated | |
| 92 | 149 var worker = new Thread(Worker); |
| 17 | 150 worker.IsBackground = true; |
| 151 worker.Start(); | |
| 152 | |
| 153 return true; | |
| 154 } | |
| 92 | 155 return false; |
| 16 | 156 } |
| 157 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
158 protected abstract void InvokeUnit(TUnit unit); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
159 |
| 41 | 160 protected virtual void Worker() { |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
161 TUnit unit; |
| 81 | 162 bool last; |
| 15 | 163 do { |
| 81 | 164 while (Dequeue(out unit, m_releaseTimeout)) { |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
165 InvokeUnit(unit); |
| 81 | 166 } |
| 167 if(!ReleaseThreadSlot(out last)) | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
168 continue; |
| 81 | 169 // queue may be not empty |
| 170 if (last && TryDequeue(out unit)) { | |
| 171 InvokeUnit(unit); | |
| 172 if (AllocateThreadSlot(1)) | |
| 173 continue; | |
| 174 // we can safely exit since pool is alive | |
| 15 | 175 } |
| 81 | 176 break; |
| 177 } while(true); | |
| 178 } | |
| 15 | 179 |
| 180 | |
| 181 protected virtual void Dispose(bool disposing) { | |
| 182 if (disposing) { | |
| 81 | 183 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier |
| 15 | 184 // wake sleeping threads |
| 81 | 185 SignalThread(); |
| 15 | 186 GC.SuppressFinalize(this); |
| 187 } | |
| 188 } | |
| 189 } | |
| 190 | |
| 191 public void Dispose() { | |
| 192 Dispose(true); | |
| 193 } | |
| 194 | |
| 195 ~DispatchPool() { | |
| 196 Dispose(false); | |
| 197 } | |
| 198 } | |
| 199 } |
