view Implab/Parallels/ArrayTraits.cs @ 187:dd4a3590f9c6 ref20160224

Reworked cancelation handling, if the cancel handler isn't specified the OperationCanceledException will be handled by the error handler Any unhandled OperationCanceledException will cause the promise cancelation
author cin
date Tue, 19 Apr 2016 17:35:20 +0300
parents 706fccb85524
children cbe10ac0731e
line wrap: on
line source

using Implab.Diagnostics;
using System;
using System.Diagnostics;
using System.Threading;

namespace Implab.Parallels {
    public static class ArrayTraits {
        class ArrayIterator<TSrc> : DispatchPool<int> {
            readonly Action<TSrc> m_action;
            readonly TSrc[] m_source;
            readonly Promise<int> m_promise = new Promise<int>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
                : base(threads) {

                Debug.Assert(source != null);
                Debug.Assert(action != null);

                m_logicalOperation = TraceContext.Instance.CurrentOperation;
                m_next = 0;
                m_source = source;
                m_pending = source.Length;
                m_action = action;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<int> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_action(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_source.Length);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
            readonly Func<TSrc, TDst> m_transform;
            readonly TSrc[] m_source;
            readonly TDst[] m_dest;
            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
                : base(threads) {

                Debug.Assert (source != null);
                Debug.Assert( transform != null);

                m_next = 0;
                m_source = source;
                m_dest = new TDst[source.Length];
                m_pending = source.Length;
                m_transform = transform;
                m_logicalOperation = TraceContext.Instance.CurrentOperation;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<TDst[]> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_dest[unit] = m_transform(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_dest);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");

            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
            return mapper.Promise;
        }

        public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (action == null)
                throw new ArgumentNullException("action");

            var iter = new ArrayIterator<TSrc>(source, action, threads);
            return iter.Promise;
        }

        public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");
            if (threads <= 0)
                throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");

            if (source.Length == 0)
                return Promise<TDst[]>.FromResult(new TDst[0]);

            var promise = new Promise<TDst[]>();
            var res = new TDst[source.Length];
            var pending = source.Length;

            object locker = new object();
            int slots = threads;

            // Analysis disable AccessToDisposedClosure
            AsyncPool.RunThread<int>(() => {
                for (int i = 0; i < source.Length; i++) {
                    if(promise.IsResolved)
                        break; // stop processing in case of error or cancellation
                    var idx = i;

                    if (Interlocked.Decrement(ref slots) < 0) {
                        lock(locker) {
                            while(slots < 0)
                                Monitor.Wait(locker);
                        }
                    }

                    try {
                        transform(source[i])
                            .On( x => {
                                Interlocked.Increment(ref slots);
                                lock (locker) {
                                    Monitor.Pulse(locker);
                                }
                            })
                            .On(
                                x => {
                                    res[idx] = x;
                                    var left = Interlocked.Decrement(ref pending);
                                    if (left == 0)
                                        promise.Resolve(res);
                                },
                                promise.Reject
                            );

                    } catch (Exception e) {
                        promise.Reject(e);
                    }
                }
                return 0;
            });

            return promise;
        }

    }
}