Mercurial > pub > ImplabNet
diff Implab/Parallels/ArrayTraits.cs @ 192:f1da3afc3521 release v2.1
Слияние с v2
author | cin |
---|---|
date | Fri, 22 Apr 2016 13:10:34 +0300 |
parents | 706fccb85524 |
children | cbe10ac0731e |
line wrap: on
line diff
--- a/Implab/Parallels/ArrayTraits.cs Wed Sep 03 18:34:02 2014 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Fri Apr 22 13:10:34 2016 +0300 @@ -1,9 +1,6 @@ using Implab.Diagnostics; using System; -using System.Collections.Generic; using System.Diagnostics; -using System.Linq; -using System.Text; using System.Threading; namespace Implab.Parallels { @@ -12,7 +9,7 @@ readonly Action<TSrc> m_action; readonly TSrc[] m_source; readonly Promise<int> m_promise = new Promise<int>(); - readonly TraceContext m_traceContext; + readonly LogicalOperation m_logicalOperation; int m_pending; int m_next; @@ -23,14 +20,13 @@ Debug.Assert(source != null); Debug.Assert(action != null); - m_traceContext = TraceContext.Snapshot(); + m_logicalOperation = TraceContext.Instance.CurrentOperation; m_next = 0; m_source = source; m_pending = source.Length; m_action = action; - m_promise.Anyway(() => Dispose()); - m_promise.Cancelled(() => Dispose()); + m_promise.On(Dispose, PromiseEventType.All); InitPool(); } @@ -42,13 +38,17 @@ } protected override void Worker() { - TraceContext.Fork(m_traceContext); - base.Worker(); + TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); + try { + base.Worker(); + } finally { + TraceContext.Instance.Leave(); + } } protected override bool TryDequeue(out int unit) { unit = Interlocked.Increment(ref m_next) - 1; - return unit >= m_source.Length ? false : true; + return unit < m_source.Length; } protected override void InvokeUnit(int unit) { @@ -68,7 +68,7 @@ readonly TSrc[] m_source; readonly TDst[] m_dest; readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); - readonly TraceContext m_traceContext; + readonly LogicalOperation m_logicalOperation; int m_pending; int m_next; @@ -84,10 +84,9 @@ m_dest = new TDst[source.Length]; m_pending = source.Length; m_transform = transform; - m_traceContext = TraceContext.Snapshot(); + m_logicalOperation = TraceContext.Instance.CurrentOperation; - m_promise.Anyway(() => Dispose()); - m_promise.Cancelled(() => Dispose()); + m_promise.On(Dispose, PromiseEventType.All); InitPool(); } @@ -99,13 +98,17 @@ } protected override void Worker() { - TraceContext.Fork(m_traceContext); - base.Worker(); + TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); + try { + base.Worker(); + } finally { + TraceContext.Instance.Leave(); + } } protected override bool TryDequeue(out int unit) { unit = Interlocked.Increment(ref m_next) - 1; - return unit >= m_source.Length ? false : true; + return unit < m_source.Length; } protected override void InvokeUnit(int unit) { @@ -140,42 +143,55 @@ return iter.Promise; } - public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { + public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) { if (source == null) throw new ArgumentNullException("source"); if (transform == null) throw new ArgumentNullException("transform"); if (threads <= 0) - throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero"); if (source.Length == 0) - return Promise<TDst[]>.ResultToPromise(new TDst[0]); + return Promise<TDst[]>.FromResult(new TDst[0]); var promise = new Promise<TDst[]>(); var res = new TDst[source.Length]; var pending = source.Length; - var semaphore = new Semaphore(threads, threads); + object locker = new object(); + int slots = threads; - AsyncPool.InvokeNewThread(() => { + // Analysis disable AccessToDisposedClosure + AsyncPool.RunThread<int>(() => { for (int i = 0; i < source.Length; i++) { if(promise.IsResolved) break; // stop processing in case of error or cancellation var idx = i; - semaphore.WaitOne(); + + if (Interlocked.Decrement(ref slots) < 0) { + lock(locker) { + while(slots < 0) + Monitor.Wait(locker); + } + } + try { - var p1 = transform(source[i]); - p1.Anyway(() => semaphore.Release()); - p1.Cancelled(() => semaphore.Release()); - p1.Then( - x => { - res[idx] = x; - var left = Interlocked.Decrement(ref pending); - if (left == 0) - promise.Resolve(res); - }, - e => promise.Reject(e) - ); + transform(source[i]) + .On( x => { + Interlocked.Increment(ref slots); + lock (locker) { + Monitor.Pulse(locker); + } + }) + .On( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + promise.Reject + ); } catch (Exception e) { promise.Reject(e); @@ -184,7 +200,7 @@ return 0; }); - return promise.Anyway(() => semaphore.Dispose()); + return promise; } }