view Implab/Parallels/ArrayTraits.cs @ 196:40d7fed4a09e

fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author cin
date Mon, 29 Aug 2016 23:15:51 +0300
parents 706fccb85524
children cbe10ac0731e
line wrap: on
line source

using Implab.Diagnostics;
using System;
using System.Diagnostics;
using System.Threading;

namespace Implab.Parallels {
    public static class ArrayTraits {
        class ArrayIterator<TSrc> : DispatchPool<int> {
            readonly Action<TSrc> m_action;
            readonly TSrc[] m_source;
            readonly Promise<int> m_promise = new Promise<int>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
                : base(threads) {

                Debug.Assert(source != null);
                Debug.Assert(action != null);

                m_logicalOperation = TraceContext.Instance.CurrentOperation;
                m_next = 0;
                m_source = source;
                m_pending = source.Length;
                m_action = action;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<int> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_action(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_source.Length);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
            readonly Func<TSrc, TDst> m_transform;
            readonly TSrc[] m_source;
            readonly TDst[] m_dest;
            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
                : base(threads) {

                Debug.Assert (source != null);
                Debug.Assert( transform != null);

                m_next = 0;
                m_source = source;
                m_dest = new TDst[source.Length];
                m_pending = source.Length;
                m_transform = transform;
                m_logicalOperation = TraceContext.Instance.CurrentOperation;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<TDst[]> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_dest[unit] = m_transform(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_dest);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");

            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
            return mapper.Promise;
        }

        public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (action == null)
                throw new ArgumentNullException("action");

            var iter = new ArrayIterator<TSrc>(source, action, threads);
            return iter.Promise;
        }

        public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");
            if (threads <= 0)
                throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");

            if (source.Length == 0)
                return Promise<TDst[]>.FromResult(new TDst[0]);

            var promise = new Promise<TDst[]>();
            var res = new TDst[source.Length];
            var pending = source.Length;

            object locker = new object();
            int slots = threads;

            // Analysis disable AccessToDisposedClosure
            AsyncPool.RunThread<int>(() => {
                for (int i = 0; i < source.Length; i++) {
                    if(promise.IsResolved)
                        break; // stop processing in case of error or cancellation
                    var idx = i;

                    if (Interlocked.Decrement(ref slots) < 0) {
                        lock(locker) {
                            while(slots < 0)
                                Monitor.Wait(locker);
                        }
                    }

                    try {
                        transform(source[i])
                            .On( x => {
                                Interlocked.Increment(ref slots);
                                lock (locker) {
                                    Monitor.Pulse(locker);
                                }
                            })
                            .On(
                                x => {
                                    res[idx] = x;
                                    var left = Interlocked.Decrement(ref pending);
                                    if (left == 0)
                                        promise.Resolve(res);
                                },
                                promise.Reject
                            );

                    } catch (Exception e) {
                        promise.Reject(e);
                    }
                }
                return 0;
            });

            return promise;
        }

    }
}