changeset 149:eb793fbbe4ea v2

fixed promises cancellation
author cin
date Wed, 06 May 2015 17:11:27 +0300 (2015-05-06)
parents e6d4b41f0101
children 3258399cba83
files Implab.Test/AsyncTests.cs Implab.Test/CancelationTests.cs Implab.Test/PromiseHelper.cs Implab/ActionChainTask.cs Implab/ActionChainTaskBase.cs Implab/ActionChainTaskT.cs Implab/ActionTask.cs Implab/ActionTaskBase.cs Implab/ActionTaskT.cs Implab/FuncChainTask.cs Implab/FuncChainTaskBase.cs Implab/FuncChainTaskT.cs Implab/FuncTask.cs Implab/FuncTaskBase.cs Implab/FuncTaskT.cs Implab/Parallels/WorkerPool.cs Implab/PromiseExtensions.cs
diffstat 17 files changed, 153 insertions(+), 73 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab.Test/AsyncTests.cs	Wed May 06 17:11:27 2015 +0300
@@ -746,33 +746,31 @@
         public void ChainedCancel2Test() {
             // при отмене цепочки обещаний, вложенные операции также должны отменяться
             var pSurvive = new Promise<bool>();
-            var hemStarted = new ManualResetEvent(false);
+            var hemStarted = new Signal();
             var p = PromiseHelper
                 .Sleep(1, "Hi, HAL!")
-                .Chain(x => {
+                .Chain(() => {
                     hemStarted.Set();
                     // запускаем две асинхронные операции
                     var result = PromiseHelper
-                        .Sleep(100000000, "HEM ENABLED!!!")
-                        .Then(s => {
-                            pSurvive.Resolve(false);
-                            return s;
-                        });
+                        .Sleep(2000, "HEM ENABLED!!!")
+                        .Then(() => pSurvive.Resolve(false));
 
                     result
                         .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
-                    
+
                     return result;
                 });
 
-            hemStarted.WaitOne();
+            hemStarted.Wait();
             p.Cancel();
 
             try {
                 p.Join();
+                Assert.Fail();
             } catch (OperationCanceledException) {
-                Assert.IsTrue(pSurvive.Join());
             }
+            Assert.IsTrue(pSurvive.Join());
         }
 
         [TestMethod]
--- a/Implab.Test/CancelationTests.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab.Test/CancelationTests.cs	Wed May 06 17:11:27 2015 +0300
@@ -48,7 +48,7 @@
             bool run = false;
             var task = new ActionTask(() => {
                 run = true;
-            }, null, null);
+            }, null, null, true);
 
             // request cancelation
             task.Cancel();
@@ -65,7 +65,7 @@
             var task = new ActionTask(() => {
                 started.Set();
                 finish.Wait();
-            }, null, null);
+            }, null, null, true);
 
             AsyncPool.RunThread(() => {
                 task.Resolve();
@@ -85,14 +85,18 @@
 
         [TestMethod]
         public void CancelTaskChainFromBottom() {
+            var started = new Signal();
             var check1 = new Signal();
             var requested = false;
             var p1 = AsyncPool.RunThread(token => {
                 token.CancellationRequested(reason => requested = true);
+                started.Set();
                 check1.Wait();
                 token.CancelOperationIfRequested();
             });
 
+            started.Wait();
+
             var p2 = p1.Then(() => {
             });
 
--- a/Implab.Test/PromiseHelper.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab.Test/PromiseHelper.cs	Wed May 06 17:11:27 2015 +0300
@@ -4,7 +4,8 @@
 namespace Implab.Test {
     static class PromiseHelper {
         public static IPromise<T> Sleep<T>(int timeout, T retVal) {
-            return AsyncPool.Invoke(() => {
+            return AsyncPool.Invoke((ct) => {
+                ct.CancellationRequested(ct.CancelOperation);
                 Thread.Sleep(timeout);
                 return retVal;
             });
--- a/Implab/ActionChainTask.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/ActionChainTask.cs	Wed May 06 17:11:27 2015 +0300
@@ -4,14 +4,16 @@
     public class ActionChainTask : ActionChainTaskBase, IDeferred {
         readonly Func<IPromise> m_task;
 
-        public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
+        public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
             m_task = task;
         }
 
         public void Resolve() {
             if (m_task != null && LockCancelation()) {
                 try {
-                    Observe(m_task());
+                    var p = m_task();
+                    p.On(SetResult, HandleErrorInternal, SetCancelled);
+                    CancellationRequested(p.Cancel);
                 } catch(Exception err) {
                     HandleErrorInternal(err);
                 }
--- a/Implab/ActionChainTaskBase.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/ActionChainTaskBase.cs	Wed May 06 17:11:27 2015 +0300
@@ -8,9 +8,11 @@
 
         int m_cancelationLock;
 
-        protected ActionChainTaskBase( Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) {
+        protected ActionChainTaskBase(Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) {
             m_error = error;
             m_cancel = cancel;
+            if (autoCancellable)
+                CancellationRequested(CancelOperation);
         }
 
         public void Reject(Exception error) {
@@ -21,21 +23,26 @@
 
 
         public override void CancelOperation(Exception reason) {
-            if (m_cancel != null && LockCancelation()) {
-                try {
-                    Observe(m_cancel(reason));
-                } catch(Exception err) {
-                    HandleErrorInternal(err);
+            if (LockCancelation()) {
+                if (m_cancel != null) {
+                    try {
+                        m_cancel(reason).On(SetResult, SetError, SetCancelled);
+                    } catch (Exception err) {
+                        HandleErrorInternal(err);
+                    }
+                } else {
+                    SetCancelled(reason);
                 }
             }
-
         }
 
         protected void HandleErrorInternal(Exception error) {
             if (m_error != null) {
                 try {
-                    Observe(m_error(error));
-                } catch(Exception err) {
+                    var p = m_error(error);
+                    p.On(SetResult,SetError,SetCancelled);
+                    CancellationRequested(p.Cancel);
+                } catch (Exception err) {
                     SetError(err);
                 }
             } else {
@@ -43,17 +50,6 @@
             }
         }
 
-        protected void Observe(IPromise operation) {
-            if (operation == null)
-                throw new NullReferenceException("The task returned null promise");
-
-            // pass operation results to the current promise
-            operation.On(SetResult, SetError, SetCancelled);
-
-            // pass the cancelation request
-            CancellationRequested(operation.Cancel);
-        }
-
         protected bool LockCancelation() {
             return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
         }
--- a/Implab/ActionChainTaskT.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/ActionChainTaskT.cs	Wed May 06 17:11:27 2015 +0300
@@ -4,14 +4,16 @@
     public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> {
         readonly Func<T, IPromise> m_task;
 
-        public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) {
+        public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
             m_task = task;
         }
 
         public void Resolve(T value) {
             if (m_task != null && LockCancelation()) {
                 try {
-                    Observe(m_task(value));
+                    var p = m_task(value);
+                    p.On(SetResult, HandleErrorInternal, SetCancelled);
+                    CancellationRequested(p.Cancel);
                 } catch(Exception err) {
                     HandleErrorInternal(err);
                 }
--- a/Implab/ActionTask.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/ActionTask.cs	Wed May 06 17:11:27 2015 +0300
@@ -3,7 +3,7 @@
 namespace Implab {
     public class ActionTask : ActionTaskBase, IDeferred {
         readonly Action m_task;
-        public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
+        public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
             m_task = task;
         }
 
--- a/Implab/ActionTaskBase.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/ActionTaskBase.cs	Wed May 06 17:11:27 2015 +0300
@@ -8,9 +8,11 @@
 
         int m_cancelationLock;
 
-        protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel) {
+        protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel, bool autoCancellable) {
             m_error = error;
             m_cancel = cancel;
+            if (autoCancellable)
+                CancellationRequested(CancelOperation);
         }
 
         public void Reject(Exception error) {
--- a/Implab/ActionTaskT.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/ActionTaskT.cs	Wed May 06 17:11:27 2015 +0300
@@ -3,7 +3,7 @@
 namespace Implab {
     public class ActionTask<T> : ActionTaskBase, IDeferred<T> {
         readonly Action<T> m_task;
-        public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) {
+        public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
             m_task = task;
         }
 
--- a/Implab/FuncChainTask.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/FuncChainTask.cs	Wed May 06 17:11:27 2015 +0300
@@ -4,14 +4,17 @@
     public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred {
         readonly Func<IPromise<TResult>> m_task;
 
-        public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
+        public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable)
+            : base(error, cancel, autoCancellable) {
             m_task = task;
         }
 
         public void Resolve() {
             if (m_task != null && LockCancelation()) {
                 try {
-                    Observe(m_task());
+                    var operation = m_task();
+                    operation.On(SetResult, HandleErrorInternal, SetCancelled);
+                    CancellationRequested(operation.Cancel);
                 } catch (Exception err) {
                     HandleErrorInternal(err);
                 }
--- a/Implab/FuncChainTaskBase.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/FuncChainTaskBase.cs	Wed May 06 17:11:27 2015 +0300
@@ -8,9 +8,11 @@
 
         int m_cancelationLock;
 
-        protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) {
+        protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) {
             m_error = error;
             m_cancel = cancel;
+            if (autoCancellable)
+                CancellationRequested(CancelOperation);
         }
 
         public void Reject(Exception error) {
@@ -19,11 +21,15 @@
         }
 
         public override void CancelOperation(Exception reason) {
-            if (m_cancel != null && LockCancelation()) {
-                try {
-                    Observe(m_cancel(reason));
-                } catch(Exception err) {
-                    HandleErrorInternal(err);
+            if (LockCancelation()) {
+                if (m_cancel != null) {
+                    try {
+                        m_cancel(reason).On(SetResult, HandleErrorInternal, SetCancelled);
+                    } catch (Exception err) {
+                        HandleErrorInternal(err);
+                    }
+                } else {
+                    SetCancelled(reason);
                 }
             }
 
@@ -32,7 +38,10 @@
         protected void HandleErrorInternal(Exception error) {
             if (m_error != null) {
                 try {
-                    Observe(m_error(error));
+                    var operation = m_error(error);
+
+                    operation.On(SetResult, SetError, SetCancelled);
+                    CancellationRequested(operation.Cancel);
                 } catch(Exception err) {
                     SetError(err);
                 }
@@ -41,17 +50,6 @@
             }
         }
 
-        protected void Observe(IPromise<TResult> operation) {
-            if (operation == null)
-                throw new NullReferenceException("The task returned null promise");
-
-            // pass operation results to the current promise
-            operation.On(SetResult, SetError, SetCancelled);
-
-            // pass the cancelation request
-            CancellationRequested(operation.Cancel);
-        }
-
         protected bool LockCancelation() {
             return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0);
         }
--- a/Implab/FuncChainTaskT.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/FuncChainTaskT.cs	Wed May 06 17:11:27 2015 +0300
@@ -4,14 +4,16 @@
     public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> {
         readonly Func<TArg, IPromise<TResult>> m_task;
 
-        public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){
+        public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) : base(error, cancel, autoCancellable){
             m_task = task;
         }
 
         public void Resolve(TArg value) {
             if (m_task != null && LockCancelation()) {
                 try {
-                    Observe(m_task(value));
+                    var operation = m_task(value);
+                    operation.On(SetResult, HandleErrorInternal, SetCancelled);
+                    CancellationRequested(operation.Cancel);
                 } catch (Exception err) {
                     HandleErrorInternal(err);
                 }
--- a/Implab/FuncTask.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/FuncTask.cs	Wed May 06 17:11:27 2015 +0300
@@ -5,7 +5,7 @@
     public class FuncTask<T> : FuncTaskBase<T>, IDeferred {
         readonly Func<T> m_task;
 
-        public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel) : base(error,cancel) {
+        public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel, bool autoCancellable) : base(error, cancel, autoCancellable) {
             m_task = task;
         }
 
--- a/Implab/FuncTaskBase.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/FuncTaskBase.cs	Wed May 06 17:11:27 2015 +0300
@@ -8,9 +8,11 @@
 
         int m_cancelationLock;
 
-        protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel) {
+        protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel, bool autoCancellable) {
             m_error = error;
             m_cancel = cancel;
+            if (autoCancellable)
+                CancellationRequested(CancelOperation);
         }
 
         public void Reject(Exception error) {
--- a/Implab/FuncTaskT.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/FuncTaskT.cs	Wed May 06 17:11:27 2015 +0300
@@ -4,7 +4,7 @@
     public class FuncTask<TArg, TResult> : FuncTaskBase<TResult>, IDeferred<TArg> {
         readonly Func<TArg, TResult> m_task;
 
-        public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel) :  base(error,cancel) {
+        public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel, bool autoCancellable) :  base(error,cancel, autoCancellable) {
             m_task = task;
         }
 
--- a/Implab/Parallels/WorkerPool.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/Parallels/WorkerPool.cs	Wed May 06 17:11:27 2015 +0300
@@ -30,7 +30,49 @@
             InitPool();
         }
 
-        public Promise<T> Invoke<T>(Func<T> task) {
+        public IPromise<T> Invoke<T>(Func<T> task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new FuncTask<T>(task, null, null, true);
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+
+                promise.Resolve();
+
+                TraceContext.Instance.Leave();
+            });
+
+            return promise;
+        }
+
+        public IPromise Invoke(Action task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new ActionTask(task, null, null, true);
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+
+                promise.Resolve();
+
+                TraceContext.Instance.Leave();
+            });
+
+            return promise;
+        }
+
+        public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
             if (task == null)
                 throw new ArgumentNullException("task");
             if (IsDisposed)
@@ -43,7 +85,35 @@
             EnqueueTask(delegate {
                 TraceContext.Instance.EnterLogicalOperation(lop, false);
                 try {
-                    promise.Resolve(task());
+                    if (!promise.CancelOperationIfRequested())
+                        promise.Resolve(task(promise));
+                } catch (Exception e) {
+                    promise.Reject(e);
+                } finally {
+                    TraceContext.Instance.Leave();
+                }
+            });
+
+            return promise;
+        }
+
+        public IPromise Invoke<T>(Action<ICancellationToken> task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new Promise();
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+                try {
+                    if (!promise.CancelOperationIfRequested()) {
+                        task(promise);
+                        promise.Resolve();
+                    }
                 } catch (Exception e) {
                     promise.Reject(e);
                 } finally {
--- a/Implab/PromiseExtensions.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/PromiseExtensions.cs	Wed May 06 17:11:27 2015 +0300
@@ -178,7 +178,7 @@
         public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
             Safe.ArgumentNotNull(that, "that");
 
-            var d = new ActionTask(success, error, cancel);
+            var d = new ActionTask(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
             if (success != null)
                 d.CancellationRequested(that.Cancel);
@@ -196,7 +196,7 @@
         public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
             Safe.ArgumentNotNull(that, "that");
 
-            var d = new FuncTask<T>(success, error, cancel);
+            var d = new FuncTask<T>(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
             if (success != null)
                 d.CancellationRequested(that.Cancel);
@@ -213,7 +213,7 @@
 
         public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
             Safe.ArgumentNotNull(that, "that");
-            var d = new FuncTask<T,T2>(success, error, cancel);
+            var d = new FuncTask<T,T2>(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
             if (success != null)
                 d.CancellationRequested(that.Cancel);
@@ -232,7 +232,7 @@
         public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
             Safe.ArgumentNotNull(that, "that");
 
-            var d = new ActionChainTask(success, error, cancel);
+            var d = new ActionChainTask(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
             if (success != null)
                 d.CancellationRequested(that.Cancel);
@@ -250,7 +250,7 @@
         public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
             Safe.ArgumentNotNull(that, "that");
 
-            var d = new FuncChainTask<T>(success, error, cancel);
+            var d = new FuncChainTask<T>(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
             if (success != null)
                 d.CancellationRequested(that.Cancel);
@@ -267,7 +267,7 @@
 
         public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
             Safe.ArgumentNotNull(that, "that");
-            var d = new FuncChainTask<T,T2>(success, error, cancel);
+            var d = new FuncChainTask<T,T2>(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
             if (success != null)
                 d.CancellationRequested(that.Cancel);