Mercurial > pub > ImplabNet
annotate Implab/Parallels/DispatchPool.cs @ 269:ff581cff7003 v3
Working on Unity container xml configuration
| author | cin |
|---|---|
| date | Tue, 24 Apr 2018 01:46:02 +0300 |
| parents | f803565868a4 |
| children |
| rev | line source |
|---|---|
| 15 | 1 using System; |
| 2 using System.Threading; | |
| 3 | |
| 4 namespace Implab.Parallels { | |
| 5 public abstract class DispatchPool<TUnit> : IDisposable { | |
| 81 | 6 readonly int m_minThreadsLimit; |
| 7 readonly int m_maxThreadsLimit; | |
| 8 readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
9 |
| 93 | 10 int m_threads; // the current size of the pool |
| 11 int m_maxRunningThreads; // the meximum reached size of the pool | |
| 12 int m_exit; // the pool is going to shutdown, all unused workers are released | |
| 80 | 13 |
| 81 | 14 readonly object m_signal = new object(); // used to pulse waiting threads |
| 15 | 15 |
| 16 protected DispatchPool(int min, int max) { | |
| 17 if (min < 0) | |
| 18 throw new ArgumentOutOfRangeException("min"); | |
| 19 if (max <= 0) | |
| 20 throw new ArgumentOutOfRangeException("max"); | |
| 21 | |
| 22 if (min > max) | |
| 23 min = max; | |
| 81 | 24 m_minThreadsLimit = min; |
| 25 m_maxThreadsLimit = max; | |
| 15 | 26 } |
| 27 | |
| 28 protected DispatchPool(int threads) | |
| 29 : this(threads, threads) { | |
| 30 } | |
| 31 | |
| 32 protected DispatchPool() { | |
| 33 | |
| 81 | 34 m_minThreadsLimit = 0; |
| 125 | 35 m_maxThreadsLimit = Environment.ProcessorCount; |
| 15 | 36 } |
| 37 | |
| 38 protected void InitPool() { | |
| 81 | 39 for (int i = 0; i < m_minThreadsLimit; i++) |
| 15 | 40 StartWorker(); |
| 41 } | |
| 42 | |
| 20 | 43 public int PoolSize { |
| 15 | 44 get { |
| 80 | 45 Thread.MemoryBarrier(); |
| 81 | 46 return m_threads; |
| 20 | 47 } |
| 48 } | |
| 81 | 49 |
| 15 | 50 public int MaxRunningThreads { |
| 51 get { | |
| 80 | 52 Thread.MemoryBarrier(); |
| 15 | 53 return m_maxRunningThreads; |
| 54 } | |
| 55 } | |
| 56 | |
| 57 protected bool IsDisposed { | |
| 58 get { | |
| 80 | 59 Thread.MemoryBarrier(); |
| 81 | 60 return m_exit == 1; |
| 15 | 61 } |
| 62 } | |
| 63 | |
| 17 | 64 protected abstract bool TryDequeue(out TUnit unit); |
| 65 | |
| 89 | 66 bool Dequeue(out TUnit unit, int timeout) { |
| 81 | 67 int ts = Environment.TickCount; |
| 68 if (TryDequeue(out unit)) | |
| 69 return true; | |
| 70 lock (m_signal) { | |
| 71 while (!TryDequeue(out unit) && m_exit == 0) | |
| 72 if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) { | |
| 73 // timeout | |
| 74 return false; | |
| 80 | 75 } |
| 81 | 76 // queue item or terminate |
| 77 Monitor.Pulse(m_signal); | |
| 78 if (m_exit == 1) | |
| 79 return false; | |
| 80 | 80 } |
| 81 | 81 return true; |
| 22 | 82 } |
| 83 | |
| 81 | 84 protected void SignalThread() { |
| 85 lock (m_signal) { | |
| 86 Monitor.Pulse(m_signal); | |
| 30 | 87 } |
| 88 } | |
| 89 | |
| 17 | 90 #region thread slots traits |
| 91 | |
| 92 bool AllocateThreadSlot() { | |
| 16 | 93 int current; |
| 15 | 94 // use spins to allocate slot for the new thread |
| 95 do { | |
| 81 | 96 current = m_threads; |
| 97 if (current >= m_maxThreadsLimit || m_exit == 1) | |
| 15 | 98 // no more slots left or the pool has been disposed |
| 99 return false; | |
| 81 | 100 } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current)); |
| 15 | 101 |
| 17 | 102 UpdateMaxThreads(current + 1); |
| 103 | |
| 104 return true; | |
| 105 } | |
| 15 | 106 |
| 17 | 107 bool AllocateThreadSlot(int desired) { |
| 81 | 108 if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1)) |
| 17 | 109 return false; |
| 110 | |
| 111 UpdateMaxThreads(desired); | |
| 15 | 112 |
| 17 | 113 return true; |
| 114 } | |
| 115 | |
| 116 bool ReleaseThreadSlot(out bool last) { | |
| 117 last = false; | |
| 118 int current; | |
| 119 // use spins to release slot for the new thread | |
| 80 | 120 Thread.MemoryBarrier(); |
| 17 | 121 do { |
| 81 | 122 current = m_threads; |
| 123 if (current <= m_minThreadsLimit && m_exit == 0) | |
| 17 | 124 // the thread is reserved |
| 125 return false; | |
| 81 | 126 } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current)); |
| 17 | 127 |
| 128 last = (current == 1); | |
| 15 | 129 |
| 130 return true; | |
| 131 } | |
| 132 | |
| 17 | 133 void UpdateMaxThreads(int count) { |
| 134 int max; | |
| 16 | 135 do { |
| 17 | 136 max = m_maxRunningThreads; |
| 137 if (max >= count) | |
| 138 break; | |
| 139 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
| 16 | 140 } |
| 141 | |
| 17 | 142 #endregion |
| 143 | |
| 81 | 144 protected bool StartWorker() { |
| 17 | 145 if (AllocateThreadSlot()) { |
| 146 // slot successfully allocated | |
| 92 | 147 var worker = new Thread(Worker); |
| 17 | 148 worker.IsBackground = true; |
| 149 worker.Start(); | |
| 150 | |
| 151 return true; | |
| 152 } | |
| 92 | 153 return false; |
| 16 | 154 } |
| 155 | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
156 protected abstract void InvokeUnit(TUnit unit); |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
157 |
| 41 | 158 protected virtual void Worker() { |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
159 TUnit unit; |
| 81 | 160 bool last; |
| 15 | 161 do { |
| 81 | 162 while (Dequeue(out unit, m_releaseTimeout)) { |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
163 InvokeUnit(unit); |
| 81 | 164 } |
| 165 if(!ReleaseThreadSlot(out last)) | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
166 continue; |
| 81 | 167 // queue may be not empty |
| 168 if (last && TryDequeue(out unit)) { | |
| 169 InvokeUnit(unit); | |
| 170 if (AllocateThreadSlot(1)) | |
| 171 continue; | |
| 172 // we can safely exit since pool is alive | |
| 15 | 173 } |
| 81 | 174 break; |
| 175 } while(true); | |
| 176 } | |
| 15 | 177 |
| 178 | |
| 179 protected virtual void Dispose(bool disposing) { | |
| 180 if (disposing) { | |
| 81 | 181 if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier |
| 15 | 182 // wake sleeping threads |
| 81 | 183 SignalThread(); |
| 15 | 184 GC.SuppressFinalize(this); |
| 185 } | |
| 186 } | |
| 187 } | |
| 188 | |
| 189 public void Dispose() { | |
| 190 Dispose(true); | |
| 191 } | |
| 192 | |
| 193 ~DispatchPool() { | |
| 194 Dispose(false); | |
| 195 } | |
| 196 } | |
| 197 } |
