Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 269:ff581cff7003 v3
Working on Unity container xml configuration
| author | cin | 
|---|---|
| date | Tue, 24 Apr 2018 01:46:02 +0300 | 
| parents | f803565868a4 | 
| children | 
| rev | line source | 
|---|---|
| 15 | 1 using System; | 
| 2 using System.Threading; | |
| 3 | |
| 4 namespace Implab.Parallels { | |
| 5 public abstract class DispatchPool<TUnit> : IDisposable { | |
| 81 | 6 readonly int m_minThreadsLimit; | 
| 7 readonly int m_maxThreadsLimit; | |
| 8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | |
| 21 
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
 cin parents: 
20diff
changeset | 9 | 
| 93 | 10 int m_threads; // the current size of the pool | 
| 11 int m_maxRunningThreads; // the meximum reached size of the pool | |
| 12 int m_exit; // the pool is going to shutdown, all unused workers are released | |
| 80 | 13 | 
| 81 | 14 readonly object m_signal = new object(); // used to pulse waiting threads | 
| 15 | 15 | 
| 16 protected DispatchPool(int min, int max) { | |
| 17 if (min < 0) | |
| 18 throw new ArgumentOutOfRangeException("min"); | |
| 19 if (max <= 0) | |
| 20 throw new ArgumentOutOfRangeException("max"); | |
| 21 | |
| 22 if (min > max) | |
| 23 min = max; | |
| 81 | 24 m_minThreadsLimit = min; | 
| 25 m_maxThreadsLimit = max; | |
| 15 | 26 } | 
| 27 | |
| 28 protected DispatchPool(int threads) | |
| 29 : this(threads, threads) { | |
| 30 } | |
| 31 | |
| 32 protected DispatchPool() { | |
| 33 | |
| 81 | 34 m_minThreadsLimit = 0; | 
| 125 | 35 m_maxThreadsLimit = Environment.ProcessorCount; | 
| 15 | 36 } | 
| 37 | |
| 38 protected void InitPool() { | |
| 81 | 39 for (int i = 0; i < m_minThreadsLimit; i++) | 
| 15 | 40 StartWorker(); | 
| 41 } | |
| 42 | |
| 20 | 43 public int PoolSize { | 
| 15 | 44 get { | 
| 80 | 45 Thread.MemoryBarrier(); | 
| 81 | 46 return m_threads; | 
| 20 | 47 } | 
| 48 } | |
| 81 | 49 | 
| 15 | 50 public int MaxRunningThreads { | 
| 51 get { | |
| 80 | 52 Thread.MemoryBarrier(); | 
| 15 | 53 return m_maxRunningThreads; | 
| 54 } | |
| 55 } | |
| 56 | |
| 57 protected bool IsDisposed { | |
| 58 get { | |
| 80 | 59 Thread.MemoryBarrier(); | 
| 81 | 60 return m_exit == 1; | 
| 15 | 61 } | 
| 62 } | |
| 63 | |
| 17 | 64 protected abstract bool TryDequeue(out TUnit unit); | 
| 65 | |
| 89 | 66 bool Dequeue(out TUnit unit, int timeout) { | 
| 81 | 67 int ts = Environment.TickCount; | 
| 68 if (TryDequeue(out unit)) | |
| 69 return true; | |
| 70 lock (m_signal) { | |
| 71 while (!TryDequeue(out unit) && m_exit == 0) | |
| 72 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | |
| 73 // timeout | |
| 74 return false; | |
| 80 | 75 } | 
| 81 | 76 // queue item or terminate | 
| 77 Monitor.Pulse(m_signal); | |
| 78 if (m_exit == 1) | |
| 79 return false; | |
| 80 | 80 } | 
| 81 | 81 return true; | 
| 22 | 82 } | 
| 83 | |
| 81 | 84 protected void SignalThread() { | 
| 85 lock (m_signal) { | |
| 86 Monitor.Pulse(m_signal); | |
| 30 | 87 } | 
| 88 } | |
| 89 | |
| 17 | 90 #region thread slots traits | 
| 91 | |
| 92 bool AllocateThreadSlot() { | |
| 16 | 93 int current; | 
| 15 | 94 // use spins to allocate slot for the new thread | 
| 95 do { | |
| 81 | 96 current = m_threads; | 
| 97 if (current >= m_maxThreadsLimit || m_exit == 1) | |
| 15 | 98 // no more slots left or the pool has been disposed | 
| 99 return false; | |
| 81 | 100 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); | 
| 15 | 101 | 
| 17 | 102 UpdateMaxThreads(current + 1); | 
| 103 | |
| 104 return true; | |
| 105 } | |
| 15 | 106 | 
| 17 | 107 bool AllocateThreadSlot(int desired) { | 
| 81 | 108 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) | 
| 17 | 109 return false; | 
| 110 | |
| 111 UpdateMaxThreads(desired); | |
| 15 | 112 | 
| 17 | 113 return true; | 
| 114 } | |
| 115 | |
| 116 bool ReleaseThreadSlot(out bool last) { | |
| 117 last = false; | |
| 118 int current; | |
| 119 // use spins to release slot for the new thread | |
| 80 | 120 Thread.MemoryBarrier(); | 
| 17 | 121 do { | 
| 81 | 122 current = m_threads; | 
| 123 if (current <= m_minThreadsLimit && m_exit == 0) | |
| 17 | 124 // the thread is reserved | 
| 125 return false; | |
| 81 | 126 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); | 
| 17 | 127 | 
| 128 last = (current == 1); | |
| 15 | 129 | 
| 130 return true; | |
| 131 } | |
| 132 | |
| 17 | 133 void UpdateMaxThreads(int count) { | 
| 134 int max; | |
| 16 | 135 do { | 
| 17 | 136 max = m_maxRunningThreads; | 
| 137 if (max >= count) | |
| 138 break; | |
| 139 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
| 16 | 140 } | 
| 141 | |
| 17 | 142 #endregion | 
| 143 | |
| 81 | 144 protected bool StartWorker() { | 
| 17 | 145 if (AllocateThreadSlot()) { | 
| 146 // slot successfully allocated | |
| 92 | 147 var worker = new Thread(Worker); | 
| 17 | 148 worker.IsBackground = true; | 
| 149 worker.Start(); | |
| 150 | |
| 151 return true; | |
| 152 } | |
| 92 | 153 return false; | 
| 16 | 154 } | 
| 155 | |
| 21 
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
 cin parents: 
20diff
changeset | 156 protected abstract void InvokeUnit(TUnit unit); | 
| 
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
 cin parents: 
20diff
changeset | 157 | 
| 41 | 158 protected virtual void Worker() { | 
| 21 
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
 cin parents: 
20diff
changeset | 159 TUnit unit; | 
| 81 | 160 bool last; | 
| 15 | 161 do { | 
| 81 | 162 while (Dequeue(out unit, m_releaseTimeout)) { | 
| 21 
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
 cin parents: 
20diff
changeset | 163 InvokeUnit(unit); | 
| 81 | 164 } | 
| 165 if(!ReleaseThreadSlot(out last)) | |
| 21 
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
 cin parents: 
20diff
changeset | 166 continue; | 
| 81 | 167 // queue may be not empty | 
| 168 if (last && TryDequeue(out unit)) { | |
| 169 InvokeUnit(unit); | |
| 170 if (AllocateThreadSlot(1)) | |
| 171 continue; | |
| 172 // we can safely exit since pool is alive | |
| 15 | 173 } | 
| 81 | 174 break; | 
| 175 } while(true); | |
| 176 } | |
| 15 | 177 | 
| 178 | |
| 179 protected virtual void Dispose(bool disposing) { | |
| 180 if (disposing) { | |
| 81 | 181 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier | 
| 15 | 182 // wake sleeping threads | 
| 81 | 183 SignalThread(); | 
| 15 | 184 GC.SuppressFinalize(this); | 
| 185 } | |
| 186 } | |
| 187 } | |
| 188 | |
| 189 public void Dispose() { | |
| 190 Dispose(true); | |
| 191 } | |
| 192 | |
| 193 ~DispatchPool() { | |
| 194 Dispose(false); | |
| 195 } | |
| 196 } | |
| 197 } | 
