view Implab/Parallels/ArrayTraits.cs @ 145:706fccb85524 v2

RC: cancellation support for promises + tests
author cin
date Sun, 08 Mar 2015 02:52:27 +0300
parents a336cb13c6a9
children cbe10ac0731e
line wrap: on
line source

using Implab.Diagnostics;
using System;
using System.Diagnostics;
using System.Threading;

namespace Implab.Parallels {
    public static class ArrayTraits {
        class ArrayIterator<TSrc> : DispatchPool<int> {
            readonly Action<TSrc> m_action;
            readonly TSrc[] m_source;
            readonly Promise<int> m_promise = new Promise<int>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
                : base(threads) {

                Debug.Assert(source != null);
                Debug.Assert(action != null);

                m_logicalOperation = TraceContext.Instance.CurrentOperation;
                m_next = 0;
                m_source = source;
                m_pending = source.Length;
                m_action = action;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<int> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_action(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_source.Length);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
            readonly Func<TSrc, TDst> m_transform;
            readonly TSrc[] m_source;
            readonly TDst[] m_dest;
            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
            readonly LogicalOperation m_logicalOperation;

            int m_pending;
            int m_next;

            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
                : base(threads) {

                Debug.Assert (source != null);
                Debug.Assert( transform != null);

                m_next = 0;
                m_source = source;
                m_dest = new TDst[source.Length];
                m_pending = source.Length;
                m_transform = transform;
                m_logicalOperation = TraceContext.Instance.CurrentOperation;

                m_promise.On(Dispose, PromiseEventType.All);

                InitPool();
            }

            public Promise<TDst[]> Promise {
                get {
                    return m_promise;
                }
            }

            protected override void Worker() {
                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
                try {
                    base.Worker();
                } finally {
                    TraceContext.Instance.Leave();
                }
            }

            protected override bool TryDequeue(out int unit) {
                unit = Interlocked.Increment(ref m_next) - 1;
                return unit < m_source.Length;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_dest[unit] = m_transform(m_source[unit]);
                    var pending = Interlocked.Decrement(ref m_pending);
                    if (pending == 0)
                        m_promise.Resolve(m_dest);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");

            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
            return mapper.Promise;
        }

        public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (action == null)
                throw new ArgumentNullException("action");

            var iter = new ArrayIterator<TSrc>(source, action, threads);
            return iter.Promise;
        }

        public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");
            if (threads <= 0)
                throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");

            if (source.Length == 0)
                return Promise<TDst[]>.FromResult(new TDst[0]);

            var promise = new Promise<TDst[]>();
            var res = new TDst[source.Length];
            var pending = source.Length;

            object locker = new object();
            int slots = threads;

            // Analysis disable AccessToDisposedClosure
            AsyncPool.RunThread<int>(() => {
                for (int i = 0; i < source.Length; i++) {
                    if(promise.IsResolved)
                        break; // stop processing in case of error or cancellation
                    var idx = i;

                    if (Interlocked.Decrement(ref slots) < 0) {
                        lock(locker) {
                            while(slots < 0)
                                Monitor.Wait(locker);
                        }
                    }

                    try {
                        transform(source[i])
                            .On( x => {
                                Interlocked.Increment(ref slots);
                                lock (locker) {
                                    Monitor.Pulse(locker);
                                }
                            })
                            .On(
                                x => {
                                    res[idx] = x;
                                    var left = Interlocked.Decrement(ref pending);
                                    if (left == 0)
                                        promise.Resolve(res);
                                },
                                promise.Reject
                            );

                    } catch (Exception e) {
                        promise.Reject(e);
                    }
                }
                return 0;
            });

            return promise;
        }

    }
}