diff Implab/Parallels/ArrayTraits.cs @ 16:5a4b735ba669 promises

sync
author cin
date Thu, 07 Nov 2013 20:20:26 +0400
parents 0f982f9b7d4d
children e3935fdf59a2
line wrap: on
line diff
--- a/Implab/Parallels/ArrayTraits.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -39,26 +39,14 @@
             }
 
             protected override bool TryDequeue(out int unit) {
-                int index;
-                unit = -1;
-                do {
-                    index = m_next;
-                    if (index >= m_source.Length)
-                        return false;
-                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
-
-                unit = index;
-                return true;
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
             }
 
             protected override void InvokeUnit(int unit) {
                 try {
                     m_action(m_source[unit]);
-                    int pending;
-                    do {
-                        pending = m_pending;
-                    } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
-                    pending--;
+                    var pending = Interlocked.Decrement(ref m_pending);
                     if (pending == 0)
                         m_promise.Resolve(m_source.Length);
                 } catch (Exception e) {
@@ -101,26 +89,14 @@
             }
 
             protected override bool TryDequeue(out int unit) {
-                int index;
-                unit = -1;
-                do {
-                    index = m_next;
-                    if (index >= m_source.Length)
-                        return false;
-                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
-
-                unit = index;
-                return true;
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
             }
 
             protected override void InvokeUnit(int unit) {
                 try {
                     m_dest[unit] = m_transform(m_source[unit]);
-                    int pending;
-                    do {
-                        pending = m_pending;
-                    } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
-                    pending --;
+                    var pending = Interlocked.Decrement(ref m_pending);
                     if (pending == 0)
                         m_promise.Resolve(m_dest);
                 } catch (Exception e) {
@@ -148,5 +124,48 @@
             var iter = new ArrayIterator<TSrc>(source, action, threads);
             return iter.Promise;
         }
+
+        public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+            if (threads <= 0)
+                throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
+
+            var promise = new Promise<TDst[]>();
+            var res = new TDst[source.Length];
+            var pending = source.Length;
+            var semaphore = new Semaphore(threads, threads);
+
+            AsyncPool.InvokeNewThread(() => {
+                for (int i = 0; i < source.Length; i++) {
+                    if(promise.State != PromiseState.Unresolved)
+                        break; // stop processing in case of error or cancellation
+                    var idx = i;
+                    semaphore.WaitOne();
+                    try {
+                        var p1 = transform(source[i]);
+                        p1.Anyway(() => semaphore.Release());
+                        p1.Cancelled(() => semaphore.Release());
+                        p1.Then(
+                            x => {
+                                res[idx] = x;
+                                var left = Interlocked.Decrement(ref pending);
+                                if (left == 0)
+                                    promise.Resolve(res);
+                            },
+                            e => promise.Reject(e)
+                        );
+
+                    } catch (Exception e) {
+                        promise.Reject(e);
+                    }
+                }
+                return 0;
+            });
+
+            return promise.Anyway(() => semaphore.Dispose());
+        }
     }
 }