Mercurial > pub > ImplabNet
diff Implab/Parallels/ArrayTraits.cs @ 75:4439140706d0 v2
major refactoring, added tasks support
author | cin |
---|---|
date | Wed, 10 Sep 2014 11:17:37 +0400 |
parents | d67b95eddaf4 |
children | c761fc982e1d |
line wrap: on
line diff
--- a/Implab/Parallels/ArrayTraits.cs Mon Sep 08 17:40:46 2014 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Wed Sep 10 11:17:37 2014 +0400 @@ -29,8 +29,7 @@ m_pending = source.Length; m_action = action; - m_promise.Anyway(() => Dispose()); - m_promise.Cancelled(() => Dispose()); + m_promise.Finally(Dispose); InitPool(); } @@ -48,7 +47,7 @@ protected override bool TryDequeue(out int unit) { unit = Interlocked.Increment(ref m_next) - 1; - return unit >= m_source.Length ? false : true; + return unit < m_source.Length; } protected override void InvokeUnit(int unit) { @@ -86,8 +85,7 @@ m_transform = transform; m_traceContext = TraceContext.Snapshot(); - m_promise.Anyway(() => Dispose()); - m_promise.Cancelled(() => Dispose()); + m_promise.Finally(Dispose); InitPool(); } @@ -157,16 +155,17 @@ var semaphore = new Semaphore(threads, threads); + // Analysis disable AccessToDisposedClosure AsyncPool.InvokeNewThread(() => { for (int i = 0; i < source.Length; i++) { if(promise.IsResolved) break; // stop processing in case of error or cancellation var idx = i; + semaphore.WaitOne(); try { var p1 = transform(source[i]); - p1.Anyway(() => semaphore.Release()); - p1.Cancelled(() => semaphore.Release()); + p1.Finally(() => semaphore.Release()); p1.Then( x => { res[idx] = x; @@ -187,7 +186,7 @@ return 0; }); - return promise.Anyway(() => semaphore.Dispose()); + return promise.Finally(semaphore.Dispose); } }