view Implab/Parallels/ArrayTraits.cs @ 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400
parents
children 5a4b735ba669
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;

namespace Implab.Parallels {
    public static class ArrayTraits {
        class ArrayIterator<TSrc> : DispatchPool<int> {
            readonly Action<TSrc> m_action;
            readonly TSrc[] m_source;
            readonly Promise<int> m_promise = new Promise<int>();

            int m_pending;
            int m_next;

            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
                : base(threads) {

                Debug.Assert(source != null);
                Debug.Assert(action != null);

                m_next = 0;
                m_source = source;
                m_pending = source.Length;
                m_action = action;

                m_promise.Anyway(() => Dispose());
                m_promise.Cancelled(() => Dispose());

                InitPool();
            }

            public Promise<int> Promise {
                get {
                    return m_promise;
                }
            }

            protected override bool TryDequeue(out int unit) {
                int index;
                unit = -1;
                do {
                    index = m_next;
                    if (index >= m_source.Length)
                        return false;
                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));

                unit = index;
                return true;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_action(m_source[unit]);
                    int pending;
                    do {
                        pending = m_pending;
                    } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
                    pending--;
                    if (pending == 0)
                        m_promise.Resolve(m_source.Length);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
            readonly Func<TSrc, TDst> m_transform;
            readonly TSrc[] m_source;
            readonly TDst[] m_dest;
            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();

            int m_pending;
            int m_next;

            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
                : base(threads) {

                Debug.Assert (source != null);
                Debug.Assert( transform != null);

                m_next = 0;
                m_source = source;
                m_dest = new TDst[source.Length];
                m_pending = source.Length;
                m_transform = transform;

                m_promise.Anyway(() => Dispose());
                m_promise.Cancelled(() => Dispose());

                InitPool();
            }

            public Promise<TDst[]> Promise {
                get {
                    return m_promise;
                }
            }

            protected override bool TryDequeue(out int unit) {
                int index;
                unit = -1;
                do {
                    index = m_next;
                    if (index >= m_source.Length)
                        return false;
                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));

                unit = index;
                return true;
            }

            protected override void InvokeUnit(int unit) {
                try {
                    m_dest[unit] = m_transform(m_source[unit]);
                    int pending;
                    do {
                        pending = m_pending;
                    } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
                    pending --;
                    if (pending == 0)
                        m_promise.Resolve(m_dest);
                } catch (Exception e) {
                    m_promise.Reject(e);
                }
            }
        }

        public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (transform == null)
                throw new ArgumentNullException("transform");

            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
            return mapper.Promise;
        }

        public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
            if (source == null)
                throw new ArgumentNullException("source");
            if (action == null)
                throw new ArgumentNullException("action");

            var iter = new ArrayIterator<TSrc>(source, action, threads);
            return iter.Promise;
        }
    }
}