view Implab/Parallels/ArrayTraits.cs @ 209:a867536c68fc v2

Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise
author cin
date Wed, 16 Nov 2016 03:06:08 +0300
parents 706fccb85524
children cbe10ac0731e
line wrap: on
line source

using Implab.Diagnostics;
using System;
using System.Diagnostics;
using System.Threading;

namespace Implab.Parallels {
    public static class ArrayTraits {
        class ArrayIterator<TSrc> : DispatchPool<int> {
            readonly Action<TSrc> m_action;
            readonly TSrc[] m_source;
            readonly Promise<int> m_promise = new Promise<int>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
                : base(threads) {

                Debug.Assert(source != null);
                Debug.Assert(action != null);

                m_logicalOperation = TraceContext.Instance.CurrentOperation;
                m_next = 0;
                m_source = source;
                m_pending = source.Length;
                m_action = action;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<int> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_action(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_source.Length);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
            readonly Func<TSrc, TDst> m_transform;
            readonly TSrc[] m_source;
            readonly TDst[] m_dest;
            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
                : base(threads) {

                Debug.Assert (source != null);
                Debug.Assert( transform != null);

                m_next = 0;
                m_source = source;
                m_dest = new TDst[source.Length];
                m_pending = source.Length;
                m_transform = transform;
                m_logicalOperation = TraceContext.Instance.CurrentOperation;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<TDst[]> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_dest[unit] = m_transform(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_dest);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");

            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
            return mapper.Promise;
        }

        public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (action == null)
                throw new ArgumentNullException("action");

            var iter = new ArrayIterator<TSrc>(source, action, threads);
            return iter.Promise;
        }

        public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");
            if (threads <= 0)
                throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");

            if (source.Length == 0)
                return Promise<TDst[]>.FromResult(new TDst[0]);

            var promise = new Promise<TDst[]>();
            var res = new TDst[source.Length];
            var pending = source.Length;

            object locker = new object();
            int slots = threads;

            // Analysis disable AccessToDisposedClosure
            AsyncPool.RunThread<int>(() => {
                for (int i = 0; i < source.Length; i++) {
                    if(promise.IsResolved)
                        break; // stop processing in case of error or cancellation
                    var idx = i;

                    if (Interlocked.Decrement(ref slots) < 0) {
                        lock(locker) {
                            while(slots < 0)
                                Monitor.Wait(locker);
                        }
                    }

                    try {
                        transform(source[i])
                            .On( x => {
                                Interlocked.Increment(ref slots);
                                lock (locker) {
                                    Monitor.Pulse(locker);
                                }
                            })
                            .On(
                                x => {
                                    res[idx] = x;
                                    var left = Interlocked.Decrement(ref pending);
                                    if (left == 0)
                                        promise.Resolve(res);
                                },
                                promise.Reject
                            );

                    } catch (Exception e) {
                        promise.Reject(e);
                    }
                }
                return 0;
            });

            return promise;
        }

    }
}