changeset 16:5a4b735ba669 promises

sync
author cin
date Thu, 07 Nov 2013 20:20:26 +0400 (2013-11-07)
parents 0f982f9b7d4d
children 7cd4a843b4e4
files Implab.Test/AsyncTests.cs Implab.suo Implab/Parallels/ArrayTraits.cs Implab/Parallels/AsyncPool.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/WorkerPool.cs Implab/Promise.cs
diffstat 7 files changed, 140 insertions(+), 52 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -242,6 +242,38 @@
         }
 
         [TestMethod]
+        public void ChainedMapTest() {
+
+            using (var pool = new WorkerPool(1,10)) {
+                int count = 10000;
+
+                double[] args = new double[count];
+                var rand = new Random();
+
+                for (int i = 0; i < count; i++)
+                    args[i] = rand.NextDouble();
+
+                var t = Environment.TickCount;
+                var res = args
+                    .ChainedMap(
+                        x => pool.Invoke(
+                            () => Math.Sin(x * x)
+                        ),
+                        4
+                    )
+                    .Join();
+
+                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
+
+                t = Environment.TickCount;
+                for (int i = 0; i < count; i++)
+                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
+                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
+            }
+        }
+
+        [TestMethod]
         public void ParallelForEachTest() {
 
             int count = 100000;
Binary file Implab.suo has changed
--- a/Implab/Parallels/ArrayTraits.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -39,26 +39,14 @@
             }
 
             protected override bool TryDequeue(out int unit) {
-                int index;
-                unit = -1;
-                do {
-                    index = m_next;
-                    if (index >= m_source.Length)
-                        return false;
-                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
-
-                unit = index;
-                return true;
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
             }
 
             protected override void InvokeUnit(int unit) {
                 try {
                     m_action(m_source[unit]);
-                    int pending;
-                    do {
-                        pending = m_pending;
-                    } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
-                    pending--;
+                    var pending = Interlocked.Decrement(ref m_pending);
                     if (pending == 0)
                         m_promise.Resolve(m_source.Length);
                 } catch (Exception e) {
@@ -101,26 +89,14 @@
             }
 
             protected override bool TryDequeue(out int unit) {
-                int index;
-                unit = -1;
-                do {
-                    index = m_next;
-                    if (index >= m_source.Length)
-                        return false;
-                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
-
-                unit = index;
-                return true;
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
             }
 
             protected override void InvokeUnit(int unit) {
                 try {
                     m_dest[unit] = m_transform(m_source[unit]);
-                    int pending;
-                    do {
-                        pending = m_pending;
-                    } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
-                    pending --;
+                    var pending = Interlocked.Decrement(ref m_pending);
                     if (pending == 0)
                         m_promise.Resolve(m_dest);
                 } catch (Exception e) {
@@ -148,5 +124,48 @@
             var iter = new ArrayIterator<TSrc>(source, action, threads);
             return iter.Promise;
         }
+
+        public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+            if (threads <= 0)
+                throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
+
+            var promise = new Promise<TDst[]>();
+            var res = new TDst[source.Length];
+            var pending = source.Length;
+            var semaphore = new Semaphore(threads, threads);
+
+            AsyncPool.InvokeNewThread(() => {
+                for (int i = 0; i < source.Length; i++) {
+                    if(promise.State != PromiseState.Unresolved)
+                        break; // stop processing in case of error or cancellation
+                    var idx = i;
+                    semaphore.WaitOne();
+                    try {
+                        var p1 = transform(source[i]);
+                        p1.Anyway(() => semaphore.Release());
+                        p1.Cancelled(() => semaphore.Release());
+                        p1.Then(
+                            x => {
+                                res[idx] = x;
+                                var left = Interlocked.Decrement(ref pending);
+                                if (left == 0)
+                                    promise.Resolve(res);
+                            },
+                            e => promise.Reject(e)
+                        );
+
+                    } catch (Exception e) {
+                        promise.Reject(e);
+                    }
+                }
+                return 0;
+            });
+
+            return promise.Anyway(() => semaphore.Dispose());
+        }
     }
 }
--- a/Implab/Parallels/AsyncPool.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab/Parallels/AsyncPool.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -36,7 +36,6 @@
                 }
             });
             worker.IsBackground = true;
-
             worker.Start();
 
             return p;
--- a/Implab/Parallels/DispatchPool.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -63,9 +63,10 @@
         }
 
         bool StartWorker() {
-            var current = m_runningThreads;
+            int current;
             // use spins to allocate slot for the new thread
             do {
+                current = m_runningThreads;
                 if (current >= m_maxThreads || m_exitRequired != 0)
                     // no more slots left or the pool has been disposed
                     return false;
@@ -84,24 +85,33 @@
 
         protected abstract bool TryDequeue(out TUnit unit);
 
-        protected virtual void WakeNewWorker() {
+        protected virtual void WakeNewWorker(bool extend) {
             if (m_suspended > 0)
                 m_hasTasks.Set();
             else
                 StartWorker();
         }
 
+        /// <summary>
+        /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
+        /// </summary>
+        protected void StartIfIdle() {
+            int threads;
+            do {
+
+            }
+        }
+
+        protected virtual void Suspend() {
+            m_hasTasks.WaitOne();
+        }
+
         bool FetchTask(out TUnit unit) {
             do {
                 // exit if requested
                 if (m_exitRequired != 0) {
                     // release the thread slot
-                    int running;
-                    do {
-                        running = m_runningThreads;
-                    } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running));
-                    running--;
-
+                    var running = Interlocked.Decrement(ref m_runningThreads);
                     if (running == 0) // it was the last worker
                         m_hasTasks.Dispose();
                     else
@@ -112,7 +122,7 @@
 
                 // fetch task
                 if (TryDequeue(out unit)) {
-                    WakeNewWorker();
+                    WakeNewWorker(true);
                     return true;
                 }
 
@@ -122,19 +132,21 @@
                 do {
                     runningThreads = m_runningThreads;
                     if (runningThreads <= m_minThreads) {
+                        // check wheather this is the last thread and we have tasks
+
                         exit = false;
                         break;
                     }
                 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads));
 
                 if (exit) {
-                    Interlocked.Decrement(ref m_runningThreads);
                     return false;
                 }
 
-                // keep this thread and wait
+                // entering suspend state
                 Interlocked.Increment(ref m_suspended);
-                m_hasTasks.WaitOne();
+                // keep this thread and wait                
+                Suspend();
                 Interlocked.Decrement(ref m_suspended);
             } while (true);
         }
--- a/Implab/Parallels/WorkerPool.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -10,20 +10,27 @@
 
         MTQueue<Action> m_queue = new MTQueue<Action>();
         int m_queueLength = 0;
+        readonly int m_threshold = 1;
 
-        public WorkerPool(int minThreads, int maxThreads)
+        public WorkerPool(int minThreads, int maxThreads, int threshold)
             : base(minThreads, maxThreads) {
-                InitPool();
+            m_threshold = threshold;
+            InitPool();
+        }
+
+        public WorkerPool(int minThreads, int maxThreads) :
+            base(minThreads, maxThreads) {
+            InitPool();
         }
 
         public WorkerPool(int threads)
             : base(threads) {
-                InitPool();
+            InitPool();
         }
 
         public WorkerPool()
             : base() {
-                InitPool();
+            InitPool();
         }
 
         public Promise<T> Invoke<T>(Func<T> task) {
@@ -47,11 +54,20 @@
 
         protected void EnqueueTask(Action unit) {
             Debug.Assert(unit != null);
-            Interlocked.Increment(ref m_queueLength);
+            var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
-            // if there are sleeping threads in the pool wake one
-            // probably this will lead a dry run
-            WakeNewWorker();
+
+            if (ThreadCount == 0)
+                // force to start
+                WakeNewWorker(false);
+        }
+
+        protected override void WakeNewWorker(bool extend) {
+            if (extend && m_queueLength <= m_threshold)
+                // in this case we are in active thread and it request for additional workers
+                // satisfy it only when queue is longer than threshold
+                return;
+            base.WakeNewWorker(extend);
         }
 
         protected override bool TryDequeue(out Action unit) {
@@ -65,5 +81,10 @@
         protected override void InvokeUnit(Action unit) {
             unit();
         }
+
+        protected override void Suspend() {
+            if (m_queueLength == 0)
+                base.Suspend();
+        }
     }
 }
--- a/Implab/Promise.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab/Promise.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -103,11 +103,16 @@
         /// <summary>
         /// Выполняет обещание, сообщая об ошибке
         /// </summary>
+        /// <remarks>
+        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
+        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
+        /// будут проигнорированы.
+        /// </remarks>
         /// <param name="error">Исключение возникшее при выполнении операции</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Reject(Exception error) {
             lock (m_lock) {
-                if (m_state == PromiseState.Cancelled)
+                if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
                     return;
                 if (m_state != PromiseState.Unresolved)
                     throw new InvalidOperationException("The promise is already resolved");