Mercurial > pub > ImplabNet
diff Implab/Parallels/ArrayTraits.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | c761fc982e1d |
children | ce0171cacec4 |
line wrap: on
line diff
--- a/Implab/Parallels/ArrayTraits.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Fri Sep 26 03:32:34 2014 +0400 @@ -153,7 +153,8 @@ var res = new TDst[source.Length]; var pending = source.Length; - var semaphore = new Semaphore(threads, threads); + object locker = new object(); + int slots = threads; // Analysis disable AccessToDisposedClosure AsyncPool.InvokeNewThread(() => { @@ -162,22 +163,28 @@ break; // stop processing in case of error or cancellation var idx = i; - semaphore.WaitOne(); + lock(locker) { + while(slots == 0) + Monitor.Wait(locker); + slots--; + } try { - var p1 = transform(source[i]); - p1.Anyway(() => semaphore.Release()); - p1.Then( - x => { - res[idx] = x; - var left = Interlocked.Decrement(ref pending); - if (left == 0) - promise.Resolve(res); - }, - e => { - promise.Reject(e); - throw new TransientPromiseException(e); - } - ); + transform(source[i]) + .Anyway(() => { + lock(locker) { + slots ++; + Monitor.Pulse(locker); + } + }) + .Last( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); } catch (Exception e) { promise.Reject(e); @@ -186,7 +193,7 @@ return 0; }); - return promise.Anyway(semaphore.Dispose); + return promise; } }