Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 17:7cd4a843b4e4 promises
Improved worker pool
author | cin |
---|---|
date | Fri, 08 Nov 2013 01:25:42 +0400 |
parents | 5a4b735ba669 |
children | 1c3b3d518480 |
comparison
equal
deleted
inserted
replaced
16:5a4b735ba669 | 17:7cd4a843b4e4 |
---|---|
60 get { | 60 get { |
61 return m_exitRequired != 0; | 61 return m_exitRequired != 0; |
62 } | 62 } |
63 } | 63 } |
64 | 64 |
65 bool StartWorker() { | 65 protected abstract bool TryDequeue(out TUnit unit); |
66 | |
67 protected virtual bool ExtendPool() { | |
68 if (m_suspended > 0) { | |
69 m_hasTasks.Set(); | |
70 return true; | |
71 } else | |
72 return StartWorker(); | |
73 } | |
74 | |
75 /// <summary> | |
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | |
77 /// </summary> | |
78 protected void WakePool() { | |
79 m_hasTasks.Set(); // wake sleeping thread; | |
80 | |
81 if (AllocateThreadSlot(1)) { | |
82 var worker = new Thread(this.Worker); | |
83 worker.IsBackground = true; | |
84 worker.Start(); | |
85 } | |
86 } | |
87 | |
88 protected virtual void Suspend() { | |
89 m_hasTasks.WaitOne(); | |
90 } | |
91 | |
92 #region thread slots traits | |
93 | |
94 bool AllocateThreadSlot() { | |
66 int current; | 95 int current; |
67 // use spins to allocate slot for the new thread | 96 // use spins to allocate slot for the new thread |
68 do { | 97 do { |
69 current = m_runningThreads; | 98 current = m_runningThreads; |
70 if (current >= m_maxThreads || m_exitRequired != 0) | 99 if (current >= m_maxThreads || m_exitRequired != 0) |
71 // no more slots left or the pool has been disposed | 100 // no more slots left or the pool has been disposed |
72 return false; | 101 return false; |
73 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | 102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); |
74 | 103 |
75 m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); | 104 UpdateMaxThreads(current + 1); |
76 | |
77 // slot successfully allocated | |
78 | |
79 var worker = new Thread(this.Worker); | |
80 worker.IsBackground = true; | |
81 worker.Start(); | |
82 | 105 |
83 return true; | 106 return true; |
84 } | 107 } |
85 | 108 |
86 protected abstract bool TryDequeue(out TUnit unit); | 109 bool AllocateThreadSlot(int desired) { |
87 | 110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) |
88 protected virtual void WakeNewWorker(bool extend) { | 111 return false; |
89 if (m_suspended > 0) | 112 |
90 m_hasTasks.Set(); | 113 UpdateMaxThreads(desired); |
91 else | 114 |
92 StartWorker(); | 115 return true; |
116 } | |
117 | |
118 bool ReleaseThreadSlot(out bool last) { | |
119 last = false; | |
120 int current; | |
121 // use spins to release slot for the new thread | |
122 do { | |
123 current = m_runningThreads; | |
124 if (current <= m_minThreads && m_exitRequired == 0) | |
125 // the thread is reserved | |
126 return false; | |
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | |
128 | |
129 last = (current == 1); | |
130 | |
131 return true; | |
93 } | 132 } |
94 | 133 |
95 /// <summary> | 134 /// <summary> |
96 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока | 135 /// releases thread slot unconditionally, used during cleanup |
97 /// </summary> | 136 /// </summary> |
98 protected void StartIfIdle() { | 137 /// <returns>true - no more threads left</returns> |
99 int threads; | 138 bool ReleaseThreadSlotAnyway() { |
100 do { | 139 var left = Interlocked.Decrement(ref m_runningThreads); |
101 | 140 return left == 0; |
102 } | 141 } |
103 } | 142 |
104 | 143 void UpdateMaxThreads(int count) { |
105 protected virtual void Suspend() { | 144 int max; |
106 m_hasTasks.WaitOne(); | 145 do { |
146 max = m_maxRunningThreads; | |
147 if (max >= count) | |
148 break; | |
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); | |
150 } | |
151 | |
152 #endregion | |
153 | |
154 bool StartWorker() { | |
155 if (AllocateThreadSlot()) { | |
156 // slot successfully allocated | |
157 var worker = new Thread(this.Worker); | |
158 worker.IsBackground = true; | |
159 worker.Start(); | |
160 | |
161 return true; | |
162 } else { | |
163 return false; | |
164 } | |
107 } | 165 } |
108 | 166 |
109 bool FetchTask(out TUnit unit) { | 167 bool FetchTask(out TUnit unit) { |
110 do { | 168 do { |
111 // exit if requested | 169 // exit if requested |
112 if (m_exitRequired != 0) { | 170 if (m_exitRequired != 0) { |
113 // release the thread slot | 171 // release the thread slot |
114 var running = Interlocked.Decrement(ref m_runningThreads); | 172 if (ReleaseThreadSlotAnyway()) // it was the last worker |
115 if (running == 0) // it was the last worker | |
116 m_hasTasks.Dispose(); | 173 m_hasTasks.Dispose(); |
117 else | 174 else |
118 m_hasTasks.Set(); // release next worker | 175 m_hasTasks.Set(); // wake next worker |
119 unit = default(TUnit); | 176 unit = default(TUnit); |
120 return false; | 177 return false; |
121 } | 178 } |
122 | 179 |
123 // fetch task | 180 // fetch task |
124 if (TryDequeue(out unit)) { | 181 if (TryDequeue(out unit)) { |
125 WakeNewWorker(true); | 182 ExtendPool(); |
126 return true; | 183 return true; |
127 } | 184 } |
128 | 185 |
129 //no tasks left, exit if the thread is no longer needed | 186 //no tasks left, exit if the thread is no longer needed |
130 int runningThreads; | 187 bool last; |
131 bool exit = true; | 188 if (ReleaseThreadSlot(out last)) { |
132 do { | 189 if (last && m_hasTasks.WaitOne(0)) { |
133 runningThreads = m_runningThreads; | 190 if (AllocateThreadSlot(1)) |
134 if (runningThreads <= m_minThreads) { | 191 continue; // spin again... |
135 // check wheather this is the last thread and we have tasks | 192 else |
136 | 193 // we failed to reallocate slot for this thread |
137 exit = false; | 194 // therefore we need to release the event |
138 break; | 195 m_hasTasks.Set(); |
139 } | 196 } |
140 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); | 197 |
141 | |
142 if (exit) { | |
143 return false; | 198 return false; |
144 } | 199 } |
145 | 200 |
146 // entering suspend state | 201 // entering suspend state |
147 Interlocked.Increment(ref m_suspended); | 202 Interlocked.Increment(ref m_suspended); |