Mercurial > pub > ImplabNet
comparison Implab/Parallels/ArrayTraits.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | c761fc982e1d |
children | ce0171cacec4 |
comparison
equal
deleted
inserted
replaced
79:05e6468f066f | 80:4f20870d0816 |
---|---|
151 | 151 |
152 var promise = new Promise<TDst[]>(); | 152 var promise = new Promise<TDst[]>(); |
153 var res = new TDst[source.Length]; | 153 var res = new TDst[source.Length]; |
154 var pending = source.Length; | 154 var pending = source.Length; |
155 | 155 |
156 var semaphore = new Semaphore(threads, threads); | 156 object locker = new object(); |
157 int slots = threads; | |
157 | 158 |
158 // Analysis disable AccessToDisposedClosure | 159 // Analysis disable AccessToDisposedClosure |
159 AsyncPool.InvokeNewThread(() => { | 160 AsyncPool.InvokeNewThread(() => { |
160 for (int i = 0; i < source.Length; i++) { | 161 for (int i = 0; i < source.Length; i++) { |
161 if(promise.IsResolved) | 162 if(promise.IsResolved) |
162 break; // stop processing in case of error or cancellation | 163 break; // stop processing in case of error or cancellation |
163 var idx = i; | 164 var idx = i; |
164 | 165 |
165 semaphore.WaitOne(); | 166 lock(locker) { |
167 while(slots == 0) | |
168 Monitor.Wait(locker); | |
169 slots--; | |
170 } | |
166 try { | 171 try { |
167 var p1 = transform(source[i]); | 172 transform(source[i]) |
168 p1.Anyway(() => semaphore.Release()); | 173 .Anyway(() => { |
169 p1.Then( | 174 lock(locker) { |
170 x => { | 175 slots ++; |
171 res[idx] = x; | 176 Monitor.Pulse(locker); |
172 var left = Interlocked.Decrement(ref pending); | 177 } |
173 if (left == 0) | 178 }) |
174 promise.Resolve(res); | 179 .Last( |
175 }, | 180 x => { |
176 e => { | 181 res[idx] = x; |
177 promise.Reject(e); | 182 var left = Interlocked.Decrement(ref pending); |
178 throw new TransientPromiseException(e); | 183 if (left == 0) |
179 } | 184 promise.Resolve(res); |
180 ); | 185 }, |
186 e => promise.Reject(e) | |
187 ); | |
181 | 188 |
182 } catch (Exception e) { | 189 } catch (Exception e) { |
183 promise.Reject(e); | 190 promise.Reject(e); |
184 } | 191 } |
185 } | 192 } |
186 return 0; | 193 return 0; |
187 }); | 194 }); |
188 | 195 |
189 return promise.Anyway(semaphore.Dispose); | 196 return promise; |
190 } | 197 } |
191 | 198 |
192 } | 199 } |
193 } | 200 } |