Mercurial > pub > ImplabNet
view Implab/Parallels/ArrayTraits.cs @ 127:d86da8d2d4c3 v2
fixed AsyncQueue iterator
author | cin |
---|---|
date | Tue, 27 Jan 2015 18:18:29 +0300 |
parents | a336cb13c6a9 |
children | 706fccb85524 |
line wrap: on
line source
using Implab.Diagnostics; using System; using System.Diagnostics; using System.Threading; namespace Implab.Parallels { public static class ArrayTraits { class ArrayIterator<TSrc> : DispatchPool<int> { readonly Action<TSrc> m_action; readonly TSrc[] m_source; readonly Promise<int> m_promise = new Promise<int>(); readonly LogicalOperation m_logicalOperation; int m_pending; int m_next; public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) : base(threads) { Debug.Assert(source != null); Debug.Assert(action != null); m_logicalOperation = TraceContext.Instance.CurrentOperation; m_next = 0; m_source = source; m_pending = source.Length; m_action = action; m_promise.On(Dispose, PromiseEventType.All); InitPool(); } public Promise<int> Promise { get { return m_promise; } } protected override void Worker() { TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); try { base.Worker(); } finally { TraceContext.Instance.Leave(); } } protected override bool TryDequeue(out int unit) { unit = Interlocked.Increment(ref m_next) - 1; return unit < m_source.Length; } protected override void InvokeUnit(int unit) { try { m_action(m_source[unit]); var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_source.Length); } catch (Exception e) { m_promise.Reject(e); } } } class ArrayMapper<TSrc, TDst>: DispatchPool<int> { readonly Func<TSrc, TDst> m_transform; readonly TSrc[] m_source; readonly TDst[] m_dest; readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); readonly LogicalOperation m_logicalOperation; int m_pending; int m_next; public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) : base(threads) { Debug.Assert (source != null); Debug.Assert( transform != null); m_next = 0; m_source = source; m_dest = new TDst[source.Length]; m_pending = source.Length; m_transform = transform; m_logicalOperation = TraceContext.Instance.CurrentOperation; m_promise.On(Dispose, PromiseEventType.All); InitPool(); } public Promise<TDst[]> Promise { get { return m_promise; } } protected override void Worker() { TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); try { base.Worker(); } finally { TraceContext.Instance.Leave(); } } protected override bool TryDequeue(out int unit) { unit = Interlocked.Increment(ref m_next) - 1; return unit < m_source.Length; } protected override void InvokeUnit(int unit) { try { m_dest[unit] = m_transform(m_source[unit]); var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_dest); } catch (Exception e) { m_promise.Reject(e); } } } public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { if (source == null) throw new ArgumentNullException("source"); if (transform == null) throw new ArgumentNullException("transform"); var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); return mapper.Promise; } public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { if (source == null) throw new ArgumentNullException("source"); if (action == null) throw new ArgumentNullException("action"); var iter = new ArrayIterator<TSrc>(source, action, threads); return iter.Promise; } public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) { if (source == null) throw new ArgumentNullException("source"); if (transform == null) throw new ArgumentNullException("transform"); if (threads <= 0) throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero"); if (source.Length == 0) return Promise<TDst[]>.ResultToPromise(new TDst[0]); var promise = new Promise<TDst[]>(); var res = new TDst[source.Length]; var pending = source.Length; object locker = new object(); int slots = threads; // Analysis disable AccessToDisposedClosure AsyncPool.RunThread<int>(() => { for (int i = 0; i < source.Length; i++) { if(promise.IsResolved) break; // stop processing in case of error or cancellation var idx = i; if (Interlocked.Decrement(ref slots) < 0) { lock(locker) { while(slots < 0) Monitor.Wait(locker); } } try { transform(source[i]) .On( x => { Interlocked.Increment(ref slots); lock (locker) { Monitor.Pulse(locker); } }) .On( x => { res[idx] = x; var left = Interlocked.Decrement(ref pending); if (left == 0) promise.Resolve(res); }, promise.Reject ); } catch (Exception e) { promise.Reject(e); } } return 0; }); return promise; } } }