diff Implab/Parallels/ArrayTraits.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents 706fccb85524
children cbe10ac0731e
line wrap: on
line diff
--- a/Implab/Parallels/ArrayTraits.cs	Wed Sep 03 18:34:02 2014 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Fri Apr 22 13:10:34 2016 +0300
@@ -1,9 +1,6 @@
 using Implab.Diagnostics;
 using System;
-using System.Collections.Generic;
 using System.Diagnostics;
-using System.Linq;
-using System.Text;
 using System.Threading;
 
 namespace Implab.Parallels {
@@ -12,7 +9,7 @@
             readonly Action<TSrc> m_action;
             readonly TSrc[] m_source;
             readonly Promise<int> m_promise = new Promise<int>();
-            readonly TraceContext m_traceContext;
+            readonly LogicalOperation m_logicalOperation;
 
             int m_pending;
             int m_next;
@@ -23,14 +20,13 @@
                 Debug.Assert(source != null);
                 Debug.Assert(action != null);
 
-                m_traceContext = TraceContext.Snapshot();
+                m_logicalOperation = TraceContext.Instance.CurrentOperation;
                 m_next = 0;
                 m_source = source;
                 m_pending = source.Length;
                 m_action = action;
 
-                m_promise.Anyway(() => Dispose());
-                m_promise.Cancelled(() => Dispose());
+                m_promise.On(Dispose, PromiseEventType.All);
 
                 InitPool();
             }
@@ -42,13 +38,17 @@
             }
 
             protected override void Worker() {
-                TraceContext.Fork(m_traceContext);
-                base.Worker();
+                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
+                try {
+                    base.Worker();
+                } finally {
+                    TraceContext.Instance.Leave();
+                }
             }
 
             protected override bool TryDequeue(out int unit) {
                 unit = Interlocked.Increment(ref m_next) - 1;
-                return unit >= m_source.Length ? false : true;
+                return unit < m_source.Length;
             }
 
             protected override void InvokeUnit(int unit) {
@@ -68,7 +68,7 @@
             readonly TSrc[] m_source;
             readonly TDst[] m_dest;
             readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
-            readonly TraceContext m_traceContext;
+            readonly LogicalOperation m_logicalOperation;
 
             int m_pending;
             int m_next;
@@ -84,10 +84,9 @@
                 m_dest = new TDst[source.Length];
                 m_pending = source.Length;
                 m_transform = transform;
-                m_traceContext = TraceContext.Snapshot();
+                m_logicalOperation = TraceContext.Instance.CurrentOperation;
 
-                m_promise.Anyway(() => Dispose());
-                m_promise.Cancelled(() => Dispose());
+                m_promise.On(Dispose, PromiseEventType.All);
 
                 InitPool();
             }
@@ -99,13 +98,17 @@
             }
 
             protected override void Worker() {
-                TraceContext.Fork(m_traceContext);
-                base.Worker();
+                TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
+                try {
+                    base.Worker();
+                } finally {
+                    TraceContext.Instance.Leave();
+                }
             }
 
             protected override bool TryDequeue(out int unit) {
                 unit = Interlocked.Increment(ref m_next) - 1;
-                return unit >= m_source.Length ? false : true;
+                return unit < m_source.Length;
             }
 
             protected override void InvokeUnit(int unit) {
@@ -140,42 +143,55 @@
             return iter.Promise;
         }
 
-        public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
+        public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
             if (source == null)
                 throw new ArgumentNullException("source");
             if (transform == null)
                 throw new ArgumentNullException("transform");
             if (threads <= 0)
-                throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
+                throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
 
             if (source.Length == 0)
-                return Promise<TDst[]>.ResultToPromise(new TDst[0]);
+                return Promise<TDst[]>.FromResult(new TDst[0]);
 
             var promise = new Promise<TDst[]>();
             var res = new TDst[source.Length];
             var pending = source.Length;
 
-            var semaphore = new Semaphore(threads, threads);
+            object locker = new object();
+            int slots = threads;
 
-            AsyncPool.InvokeNewThread(() => {
+            // Analysis disable AccessToDisposedClosure
+            AsyncPool.RunThread<int>(() => {
                 for (int i = 0; i < source.Length; i++) {
                     if(promise.IsResolved)
                         break; // stop processing in case of error or cancellation
                     var idx = i;
-                    semaphore.WaitOne();
+
+                    if (Interlocked.Decrement(ref slots) < 0) {
+                        lock(locker) {
+                            while(slots < 0)
+                                Monitor.Wait(locker);
+                        }
+                    }
+
                     try {
-                        var p1 = transform(source[i]);
-                        p1.Anyway(() => semaphore.Release());
-                        p1.Cancelled(() => semaphore.Release());
-                        p1.Then(
-                            x => {
-                                res[idx] = x;
-                                var left = Interlocked.Decrement(ref pending);
-                                if (left == 0)
-                                    promise.Resolve(res);
-                            },
-                            e => promise.Reject(e)
-                        );
+                        transform(source[i])
+                            .On( x => {
+                                Interlocked.Increment(ref slots);
+                                lock (locker) {
+                                    Monitor.Pulse(locker);
+                                }
+                            })
+                            .On(
+                                x => {
+                                    res[idx] = x;
+                                    var left = Interlocked.Decrement(ref pending);
+                                    if (left == 0)
+                                        promise.Resolve(res);
+                                },
+                                promise.Reject
+                            );
 
                     } catch (Exception e) {
                         promise.Reject(e);
@@ -184,7 +200,7 @@
                 return 0;
             });
 
-            return promise.Anyway(() => semaphore.Dispose());
+            return promise;
         }
 
     }