comparison Implab/Parallels/ArrayTraits.cs @ 80:4f20870d0816 v2

added memory barriers
author cin
date Fri, 26 Sep 2014 03:32:34 +0400
parents c761fc982e1d
children ce0171cacec4
comparison
equal deleted inserted replaced
79:05e6468f066f 80:4f20870d0816
151 151
152 var promise = new Promise<TDst[]>(); 152 var promise = new Promise<TDst[]>();
153 var res = new TDst[source.Length]; 153 var res = new TDst[source.Length];
154 var pending = source.Length; 154 var pending = source.Length;
155 155
156 var semaphore = new Semaphore(threads, threads); 156 object locker = new object();
157 int slots = threads;
157 158
158 // Analysis disable AccessToDisposedClosure 159 // Analysis disable AccessToDisposedClosure
159 AsyncPool.InvokeNewThread(() => { 160 AsyncPool.InvokeNewThread(() => {
160 for (int i = 0; i < source.Length; i++) { 161 for (int i = 0; i < source.Length; i++) {
161 if(promise.IsResolved) 162 if(promise.IsResolved)
162 break; // stop processing in case of error or cancellation 163 break; // stop processing in case of error or cancellation
163 var idx = i; 164 var idx = i;
164 165
165 semaphore.WaitOne(); 166 lock(locker) {
167 while(slots == 0)
168 Monitor.Wait(locker);
169 slots--;
170 }
166 try { 171 try {
167 var p1 = transform(source[i]); 172 transform(source[i])
168 p1.Anyway(() => semaphore.Release()); 173 .Anyway(() => {
169 p1.Then( 174 lock(locker) {
170 x => { 175 slots ++;
171 res[idx] = x; 176 Monitor.Pulse(locker);
172 var left = Interlocked.Decrement(ref pending); 177 }
173 if (left == 0) 178 })
174 promise.Resolve(res); 179 .Last(
175 }, 180 x => {
176 e => { 181 res[idx] = x;
177 promise.Reject(e); 182 var left = Interlocked.Decrement(ref pending);
178 throw new TransientPromiseException(e); 183 if (left == 0)
179 } 184 promise.Resolve(res);
180 ); 185 },
186 e => promise.Reject(e)
187 );
181 188
182 } catch (Exception e) { 189 } catch (Exception e) {
183 promise.Reject(e); 190 promise.Reject(e);
184 } 191 }
185 } 192 }
186 return 0; 193 return 0;
187 }); 194 });
188 195
189 return promise.Anyway(semaphore.Dispose); 196 return promise;
190 } 197 }
191 198
192 } 199 }
193 } 200 }