changeset 10:aa33d0bb8c0c promises

implemeted new cancellable promises concept
author cin
date Sun, 03 Nov 2013 18:07:38 +0400
parents c82e0dfbb4dd
children 6ec82bf68c8e
files .hgignore Implab.Test/AsyncTests.cs Implab.Test/Implab.Test.csproj Implab.Test/PromiseHelper.cs Implab.suo Implab/IPromise.cs Implab/Promise.cs Implab/TaskController.cs
diffstat 8 files changed, 185 insertions(+), 119 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Sat Nov 02 00:55:47 2013 +0400
+++ b/.hgignore	Sun Nov 03 18:07:38 2013 +0400
@@ -10,3 +10,4 @@
 Implab.Fx/bin/
 Implab.Fx.Test/bin/
 Implab.Fx.Test/obj/
+_ReSharper.Implab/
--- a/Implab.Test/AsyncTests.cs	Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Sun Nov 03 18:07:38 2013 +0400
@@ -1,10 +1,9 @@
 using System;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Implab;
 using System.Reflection;
 using System.Threading;
 
-namespace Implab.Tests
+namespace Implab.Test
 {
 	[TestClass]
 	public class AsyncTests
@@ -90,12 +89,39 @@
 		public void PoolTest ()
 		{
 			var pid = Thread.CurrentThread.ManagedThreadId;
-			var p = AsyncPool.Invoke (() => {
-				return Thread.CurrentThread.ManagedThreadId;
-			});
+			var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
 
 			Assert.AreNotEqual (pid, p.Join ());
 		}
+
+        [TestMethod]
+        public void ComplexCase1Test() {
+            var flags = new bool[3];
+
+            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
+
+            var p = PromiseHelper
+                .Sleep(200, "Alan")
+                .Cancelled(() => flags[0] = true)
+                .Chain(x =>
+                    PromiseHelper
+                        .Sleep(200, "Hi, " + x)
+                        .Map( y => y )
+                        .Cancelled(() => flags[1] = true)
+                )
+                .Cancelled(() => flags[2] = true);
+            Thread.Sleep(300);
+            p.Cancel();
+            try {
+                Assert.AreEqual(p.Join(), "Hi, Alan");
+                Assert.Fail("Shouldn't get here");
+            } catch(OperationCanceledException) {
+            }
+
+            Assert.IsFalse(flags[0]);
+            Assert.IsTrue(flags[1]);
+            Assert.IsTrue(flags[2]);
+        }
 	}
 }
 
--- a/Implab.Test/Implab.Test.csproj	Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab.Test/Implab.Test.csproj	Sun Nov 03 18:07:38 2013 +0400
@@ -46,6 +46,7 @@
   </ItemGroup>
   <ItemGroup>
     <Compile Include="AsyncTests.cs" />
+    <Compile Include="PromiseHelper.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab.Test/PromiseHelper.cs	Sun Nov 03 18:07:38 2013 +0400
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Test {
+    class PromiseHelper {
+        public static Promise<T> Sleep<T>(int timeout, T retVal) {
+            return AsyncPool.Invoke(() => {
+                Thread.Sleep(timeout);
+                return retVal;
+            });
+        }
+    }
+}
Binary file Implab.suo has changed
--- a/Implab/IPromise.cs	Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab/IPromise.cs	Sun Nov 03 18:07:38 2013 +0400
@@ -29,5 +29,12 @@
         /// <param name="dependencies">Try to cancel the whole promise chain, the parent promise will be cancelled only if it has only one promise</param>
         /// <returns></returns>
         bool Cancel(bool dependencies);
+
+        /// <summary>
+        /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
+        /// handler will be invoked immediatelly.
+        /// </summary>
+        /// <param name="handler">The handler</param>
+        void HandleCancelled(Action handler);
     }
 }
--- a/Implab/Promise.cs	Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab/Promise.cs	Sun Nov 03 18:07:38 2013 +0400
@@ -1,8 +1,6 @@
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Reflection;
-using System.Text;
 using System.Diagnostics;
 using System.Threading;
 
@@ -10,9 +8,9 @@
 
     public delegate void ErrorHandler(Exception e);
 
-    public delegate void ResultHandler<T>(T result);
-    public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
-    public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
+    public delegate void ResultHandler<in T>(T result);
+    public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
+    public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
 
     /// <summary>
     /// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -55,20 +53,19 @@
             public ErrorHandler errorHandler;
         }
 
-        IPromise m_parent;
+        readonly IPromise m_parent;
 
         LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
         LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
 
-        object m_lock = new Object();
-        bool m_cancellable;
+        readonly object m_lock = new Object();
+        readonly bool m_cancellable;
+        int m_childrenCount = 0;
 
         PromiseState m_state;
         T m_result;
         Exception m_error;
 
-        int m_childrenCount;
-
         public Promise() {
             m_cancellable = true;
         }
@@ -76,15 +73,14 @@
         public Promise(IPromise parent, bool cancellable) {
             m_cancellable = cancellable;
             m_parent = parent;
+            if (parent != null)
+                parent.HandleCancelled(InternalCancel);
         }
 
-        /// <summary>
-        /// Событие, возникающее при отмене асинхронной операции.
-        /// </summary>
-        /// <description>
-        /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
-        /// </description>
-        public event EventHandler Cancelled;
+        void InternalCancel() {
+            // don't try to cancel parent :)
+            Cancel(false);
+        }
 
         /// <summary>
         /// Выполняет обещание, сообщая об успешном выполнении.
@@ -101,14 +97,7 @@
                 m_state = PromiseState.Resolved;
             }
 
-            // state has been changed to rejected new handlers can't be added
-
-            foreach (var handler in m_resultHandlers)
-                InvokeHandler(handler);
-
-            /* ResultHandlerInfo handler;
-            while (FetchNextHandler(out handler))
-                InvokeHandler(handler); */
+            OnStateChanged();
         }
 
         /// <summary>
@@ -126,14 +115,7 @@
                 m_state = PromiseState.Rejected;
             }
 
-            // state has been changed to rejected new handlers can't be added
-
-            foreach (var handler in m_resultHandlers)
-                InvokeHandler(handler);
-
-            /*ResultHandlerInfo handler;
-            while (FetchNextHandler(out handler))
-                InvokeHandler(handler);*/
+            OnStateChanged();
         }
 
         /// <summary>
@@ -144,6 +126,39 @@
             return Cancel(true);
         }
 
+        protected virtual void OnStateChanged() {
+            switch (m_state) {
+                case PromiseState.Resolved:
+                    foreach (var resultHandlerInfo in m_resultHandlers)
+                        try {
+                            if (resultHandlerInfo.resultHandler != null)
+                                resultHandlerInfo.resultHandler(m_result);
+                        } catch (Exception e) {
+                            try {
+                                if (resultHandlerInfo.errorHandler != null)
+                                    resultHandlerInfo.errorHandler(e);
+                            } catch { }
+                        }
+                    break;
+                case PromiseState.Cancelled:
+                    foreach (var cancelHandler in m_cancelHandlers)
+                        cancelHandler();
+                    break;
+                case PromiseState.Rejected:
+                    foreach (var resultHandlerInfo in m_resultHandlers)
+                        try {
+                            if (resultHandlerInfo.errorHandler != null)
+                                resultHandlerInfo.errorHandler(m_error);
+                        } catch { }
+                    break;
+                default:
+                    throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
+            }
+
+            m_resultHandlers = null;
+            m_cancelHandlers = null;
+        }
+
         /// <summary>
         /// Добавляет обработчики событий выполнения обещания.
         /// </summary>
@@ -162,15 +177,11 @@
 
             if (success != null)
                 handlerInfo.resultHandler = x => {
-                    try {
-                        success(x);
-                        medium.Resolve(x);
-                    } catch (Exception e) {
-                        medium.Reject(e);
-                    }
+                    success(x);
+                    medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = x => medium.Resolve(x);
+                handlerInfo.resultHandler = medium.Resolve;
 
             if (error != null)
                 handlerInfo.errorHandler = x => {
@@ -180,7 +191,7 @@
                     medium.Reject(x);
                 };
             else
-                handlerInfo.errorHandler = x => medium.Reject(x);
+                handlerInfo.errorHandler = medium.Reject;
 
             AddHandler(handlerInfo);
 
@@ -203,6 +214,7 @@
 
             AddHandler(new ResultHandlerInfo {
                 resultHandler = x => {
+                    // to avoid handler being called multiple times we handle exception by ourselfs
                     try {
                         handler();
                         medium.Resolve(x);
@@ -234,20 +246,15 @@
                 throw new ArgumentNullException("mapper");
 
             // создаем прицепленное обещание
-            Promise<TNew> chained = new Promise<TNew>();
+            var chained = new Promise<TNew>();
 
             AddHandler(new ResultHandlerInfo() {
-                resultHandler = delegate(T result) {
-                    try {
-                        // если преобразование выдаст исключение, то сработает reject сцепленного deferred
-                        chained.Resolve(mapper(result));
-                    } catch (Exception e) {
-                        chained.Reject(e);
-                    }
-                },
+                resultHandler = result => chained.Resolve(mapper(result)),
                 errorHandler = delegate(Exception e) {
                     if (error != null)
-                        error(e);
+                        try {
+                            error(e);
+                        } catch { }
                     // в случае ошибки нужно передать исключение дальше по цепочке
                     chained.Reject(e);
                 }
@@ -276,19 +283,21 @@
             // создать посредника, к которому будут подвызяваться следующие обработчики.
             // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
             // передать через него результаты работы.
-            Promise<TNew> medium = new Promise<TNew>();
+            var medium = new Promise<TNew>(this, true);
 
-            AddHandler(new ResultHandlerInfo() {
+            AddHandler(new ResultHandlerInfo {
                 resultHandler = delegate(T result) {
-                    try {
-                        chained(result).Then(
-                            x => medium.Resolve(x),
-                            e => medium.Reject(e)
-                            );
-                    } catch (Exception e) {
-                        // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
-                        medium.Reject(e);
-                    }
+                    if (medium.State == PromiseState.Cancelled)
+                        return;
+
+                    var promise = chained(result);
+
+                    // notify chained operation that it's not needed
+                    medium.Cancelled(() => promise.Cancel());
+                    promise.Then(
+                        medium.Resolve,
+                        medium.Reject
+                        );
                 },
                 errorHandler = delegate(Exception e) {
                     if (error != null)
@@ -305,6 +314,22 @@
             return Chain(chained, null);
         }
 
+        public Promise<T> Cancelled(Action handler) {
+            if (handler == null)
+                return this;
+            lock (m_lock) {
+                if (m_state == PromiseState.Unresolved)
+                    m_cancelHandlers.AddLast(handler);
+                else if (m_state == PromiseState.Cancelled)
+                    handler();
+            }
+            return this;
+        }
+
+        public void HandleCancelled(Action handler) {
+            Cancelled(handler);
+        }
+
         /// <summary>
         /// Дожидается отложенного обещания и в случае успеха, возвращает
         /// его, результат, в противном случае бросает исключение.
@@ -327,51 +352,37 @@
         /// <param name="timeout">Время ожидания</param>
         /// <returns>Результат выполнения обещания</returns>
         public T Join(int timeout) {
-            ManualResetEvent evt = new ManualResetEvent(false);
+            var evt = new ManualResetEvent(false);
             Anyway(() => evt.Set());
+            Cancelled(() => evt.Set());
 
             if (!evt.WaitOne(timeout, true))
                 throw new TimeoutException();
 
-            if (m_error != null)
-                throw new TargetInvocationException(m_error);
-            else
-                return m_result;
+            switch (State) {
+                case PromiseState.Resolved:
+                    return m_result;
+                case PromiseState.Cancelled:
+                    throw new OperationCanceledException();
+                case PromiseState.Rejected:
+                    throw new TargetInvocationException(m_error);
+                default:
+                    throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+            }
         }
 
         public T Join() {
             return Join(Timeout.Infinite);
         }
 
-        /// <summary>
-        /// Данный метод последовательно извлекает обработчики обещания и когда
-        /// их больше не осталось - ставит состояние "разрешено".
-        /// </summary>
-        /// <param name="handler">Информация об обработчике</param>
-        /// <returns>Признак того, что еще остались обработчики в очереди</returns>
-        bool FetchNextHandler(out ResultHandlerInfo handler) {
-            handler = default(ResultHandlerInfo);
-
-            lock (this) {
-                Debug.Assert(m_state != PromiseState.Unresolved);
-
-                if (m_resultHandlers.Count > 0) {
-                    handler = m_resultHandlers.First.Value;
-                    m_resultHandlers.RemoveFirst();
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-        }
-
         void AddHandler(ResultHandlerInfo handler) {
             bool invokeRequired = false;
 
-            lock (this) {
-                if (m_state == PromiseState.Unresolved)
+            lock (m_lock) {
+                m_childrenCount++;
+                if (m_state == PromiseState.Unresolved) {
                     m_resultHandlers.AddLast(handler);
-                else
+                } else
                     invokeRequired = true;
             }
 
@@ -381,18 +392,27 @@
         }
 
         void InvokeHandler(ResultHandlerInfo handler) {
-            if (m_error == null) {
-                try {
-                    if (handler.resultHandler != null)
-                        handler.resultHandler(m_result);
-                } catch { }
-            }
-
-            if (m_error != null) {
-                try {
-                    if (handler.errorHandler != null)
-                        handler.errorHandler(m_error);
-                } catch { }
+            switch (m_state) {
+                case PromiseState.Resolved:
+                    try {
+                        if (handler.resultHandler != null)
+                            handler.resultHandler(m_result);
+                    } catch (Exception e) {
+                        try {
+                            if (handler.errorHandler != null)
+                                handler.errorHandler(e);
+                        } catch { }
+                    }
+                    break;
+                case PromiseState.Rejected:
+                    try {
+                        if (handler.errorHandler != null)
+                            handler.errorHandler(m_error);
+                    } catch { }
+                    break;
+                default:
+                    // do nothing
+                    return;
             }
         }
 
@@ -426,15 +446,11 @@
                 }
             }
 
-            if (dependencies && m_parent != null && m_parent.IsExclusive) {
-                // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive)
-                m_parent.Cancel(true);
-            }
+            if (result)
+                OnStateChanged();
 
-            if (result) {
-                // state has been changed to cancelled, new handlers can't be added
-                foreach (var handler in m_cancelHandlers)
-                    handler();
+            if (dependencies && m_parent != null && m_parent.IsExclusive) {
+                m_parent.Cancel(true);
             }
 
             return result;
--- a/Implab/TaskController.cs	Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab/TaskController.cs	Sun Nov 03 18:07:38 2013 +0400
@@ -14,9 +14,8 @@
     /// </remarks>
     class TaskController
     {
-        object m_lock;
+        readonly object m_lock;
         string m_message;
-        bool m_cancelled;
 
         float m_current;
         float m_max;