diff Implab/Parallels/ArrayTraits.cs @ 18:0c924dff5498

Слияние с promises
author cin
date Fri, 08 Nov 2013 01:27:04 +0400
parents 5a4b735ba669
children e3935fdf59a2
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/ArrayTraits.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,171 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Parallels {
+    public static class ArrayTraits {
+        class ArrayIterator<TSrc> : DispatchPool<int> {
+            readonly Action<TSrc> m_action;
+            readonly TSrc[] m_source;
+            readonly Promise<int> m_promise = new Promise<int>();
+
+            int m_pending;
+            int m_next;
+
+            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
+                : base(threads) {
+
+                Debug.Assert(source != null);
+                Debug.Assert(action != null);
+
+                m_next = 0;
+                m_source = source;
+                m_pending = source.Length;
+                m_action = action;
+
+                m_promise.Anyway(() => Dispose());
+                m_promise.Cancelled(() => Dispose());
+
+                InitPool();
+            }
+
+            public Promise<int> Promise {
+                get {
+                    return m_promise;
+                }
+            }
+
+            protected override bool TryDequeue(out int unit) {
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
+            }
+
+            protected override void InvokeUnit(int unit) {
+                try {
+                    m_action(m_source[unit]);
+                    var pending = Interlocked.Decrement(ref m_pending);
+                    if (pending == 0)
+                        m_promise.Resolve(m_source.Length);
+                } catch (Exception e) {
+                    m_promise.Reject(e);
+                }
+            }
+        }
+
+        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
+            readonly Func<TSrc, TDst> m_transform;
+            readonly TSrc[] m_source;
+            readonly TDst[] m_dest;
+            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
+
+            int m_pending;
+            int m_next;
+
+            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
+                : base(threads) {
+
+                Debug.Assert (source != null);
+                Debug.Assert( transform != null);
+
+                m_next = 0;
+                m_source = source;
+                m_dest = new TDst[source.Length];
+                m_pending = source.Length;
+                m_transform = transform;
+
+                m_promise.Anyway(() => Dispose());
+                m_promise.Cancelled(() => Dispose());
+
+                InitPool();
+            }
+
+            public Promise<TDst[]> Promise {
+                get {
+                    return m_promise;
+                }
+            }
+
+            protected override bool TryDequeue(out int unit) {
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
+            }
+
+            protected override void InvokeUnit(int unit) {
+                try {
+                    m_dest[unit] = m_transform(m_source[unit]);
+                    var pending = Interlocked.Decrement(ref m_pending);
+                    if (pending == 0)
+                        m_promise.Resolve(m_dest);
+                } catch (Exception e) {
+                    m_promise.Reject(e);
+                }
+            }
+        }
+
+        public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+
+            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
+            return mapper.Promise;
+        }
+
+        public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (action == null)
+                throw new ArgumentNullException("action");
+
+            var iter = new ArrayIterator<TSrc>(source, action, threads);
+            return iter.Promise;
+        }
+
+        public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+            if (threads <= 0)
+                throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
+
+            var promise = new Promise<TDst[]>();
+            var res = new TDst[source.Length];
+            var pending = source.Length;
+            var semaphore = new Semaphore(threads, threads);
+
+            AsyncPool.InvokeNewThread(() => {
+                for (int i = 0; i < source.Length; i++) {
+                    if(promise.State != PromiseState.Unresolved)
+                        break; // stop processing in case of error or cancellation
+                    var idx = i;
+                    semaphore.WaitOne();
+                    try {
+                        var p1 = transform(source[i]);
+                        p1.Anyway(() => semaphore.Release());
+                        p1.Cancelled(() => semaphore.Release());
+                        p1.Then(
+                            x => {
+                                res[idx] = x;
+                                var left = Interlocked.Decrement(ref pending);
+                                if (left == 0)
+                                    promise.Resolve(res);
+                            },
+                            e => promise.Reject(e)
+                        );
+
+                    } catch (Exception e) {
+                        promise.Reject(e);
+                    }
+                }
+                return 0;
+            });
+
+            return promise.Anyway(() => semaphore.Dispose());
+        }
+    }
+}