Mercurial > pub > ImplabNet
changeset 149:eb793fbbe4ea v2
fixed promises cancellation
author | cin |
---|---|
date | Wed, 06 May 2015 17:11:27 +0300 |
parents | e6d4b41f0101 |
children | 3258399cba83 |
files | Implab.Test/AsyncTests.cs Implab.Test/CancelationTests.cs Implab.Test/PromiseHelper.cs Implab/ActionChainTask.cs Implab/ActionChainTaskBase.cs Implab/ActionChainTaskT.cs Implab/ActionTask.cs Implab/ActionTaskBase.cs Implab/ActionTaskT.cs Implab/FuncChainTask.cs Implab/FuncChainTaskBase.cs Implab/FuncChainTaskT.cs Implab/FuncTask.cs Implab/FuncTaskBase.cs Implab/FuncTaskT.cs Implab/Parallels/WorkerPool.cs Implab/PromiseExtensions.cs |
diffstat | 17 files changed, 153 insertions(+), 73 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab.Test/AsyncTests.cs Wed May 06 17:11:27 2015 +0300 @@ -746,33 +746,31 @@ public void ChainedCancel2Test() { // при отмене цепочки обещаний, вложенные операции также должны отменяться var pSurvive = new Promise<bool>(); - var hemStarted = new ManualResetEvent(false); + var hemStarted = new Signal(); var p = PromiseHelper .Sleep(1, "Hi, HAL!") - .Chain(x => { + .Chain(() => { hemStarted.Set(); // запускаем две асинхронные операции var result = PromiseHelper - .Sleep(100000000, "HEM ENABLED!!!") - .Then(s => { - pSurvive.Resolve(false); - return s; - }); + .Sleep(2000, "HEM ENABLED!!!") + .Then(() => pSurvive.Resolve(false)); result .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled); - + return result; }); - hemStarted.WaitOne(); + hemStarted.Wait(); p.Cancel(); try { p.Join(); + Assert.Fail(); } catch (OperationCanceledException) { - Assert.IsTrue(pSurvive.Join()); } + Assert.IsTrue(pSurvive.Join()); } [TestMethod]
--- a/Implab.Test/CancelationTests.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab.Test/CancelationTests.cs Wed May 06 17:11:27 2015 +0300 @@ -48,7 +48,7 @@ bool run = false; var task = new ActionTask(() => { run = true; - }, null, null); + }, null, null, true); // request cancelation task.Cancel(); @@ -65,7 +65,7 @@ var task = new ActionTask(() => { started.Set(); finish.Wait(); - }, null, null); + }, null, null, true); AsyncPool.RunThread(() => { task.Resolve(); @@ -85,14 +85,18 @@ [TestMethod] public void CancelTaskChainFromBottom() { + var started = new Signal(); var check1 = new Signal(); var requested = false; var p1 = AsyncPool.RunThread(token => { token.CancellationRequested(reason => requested = true); + started.Set(); check1.Wait(); token.CancelOperationIfRequested(); }); + started.Wait(); + var p2 = p1.Then(() => { });
--- a/Implab.Test/PromiseHelper.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab.Test/PromiseHelper.cs Wed May 06 17:11:27 2015 +0300 @@ -4,7 +4,8 @@ namespace Implab.Test { static class PromiseHelper { public static IPromise<T> Sleep<T>(int timeout, T retVal) { - return AsyncPool.Invoke(() => { + return AsyncPool.Invoke((ct) => { + ct.CancellationRequested(ct.CancelOperation); Thread.Sleep(timeout); return retVal; });
--- a/Implab/ActionChainTask.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/ActionChainTask.cs Wed May 06 17:11:27 2015 +0300 @@ -4,14 +4,16 @@ public class ActionChainTask : ActionChainTaskBase, IDeferred { readonly Func<IPromise> m_task; - public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) { + public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; } public void Resolve() { if (m_task != null && LockCancelation()) { try { - Observe(m_task()); + var p = m_task(); + p.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(p.Cancel); } catch(Exception err) { HandleErrorInternal(err); }
--- a/Implab/ActionChainTaskBase.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/ActionChainTaskBase.cs Wed May 06 17:11:27 2015 +0300 @@ -8,9 +8,11 @@ int m_cancelationLock; - protected ActionChainTaskBase( Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) { + protected ActionChainTaskBase(Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) { @@ -21,21 +23,26 @@ public override void CancelOperation(Exception reason) { - if (m_cancel != null && LockCancelation()) { - try { - Observe(m_cancel(reason)); - } catch(Exception err) { - HandleErrorInternal(err); + if (LockCancelation()) { + if (m_cancel != null) { + try { + m_cancel(reason).On(SetResult, SetError, SetCancelled); + } catch (Exception err) { + HandleErrorInternal(err); + } + } else { + SetCancelled(reason); } } - } protected void HandleErrorInternal(Exception error) { if (m_error != null) { try { - Observe(m_error(error)); - } catch(Exception err) { + var p = m_error(error); + p.On(SetResult,SetError,SetCancelled); + CancellationRequested(p.Cancel); + } catch (Exception err) { SetError(err); } } else { @@ -43,17 +50,6 @@ } } - protected void Observe(IPromise operation) { - if (operation == null) - throw new NullReferenceException("The task returned null promise"); - - // pass operation results to the current promise - operation.On(SetResult, SetError, SetCancelled); - - // pass the cancelation request - CancellationRequested(operation.Cancel); - } - protected bool LockCancelation() { return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0); }
--- a/Implab/ActionChainTaskT.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/ActionChainTaskT.cs Wed May 06 17:11:27 2015 +0300 @@ -4,14 +4,16 @@ public class ActionChainTask<T> : ActionChainTaskBase, IDeferred<T> { readonly Func<T, IPromise> m_task; - public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) : base(error,cancel) { + public ActionChainTask(Func<T, IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; } public void Resolve(T value) { if (m_task != null && LockCancelation()) { try { - Observe(m_task(value)); + var p = m_task(value); + p.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(p.Cancel); } catch(Exception err) { HandleErrorInternal(err); }
--- a/Implab/ActionTask.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/ActionTask.cs Wed May 06 17:11:27 2015 +0300 @@ -3,7 +3,7 @@ namespace Implab { public class ActionTask : ActionTaskBase, IDeferred { readonly Action m_task; - public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) { + public ActionTask(Action task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; }
--- a/Implab/ActionTaskBase.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/ActionTaskBase.cs Wed May 06 17:11:27 2015 +0300 @@ -8,9 +8,11 @@ int m_cancelationLock; - protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel) { + protected ActionTaskBase( Action<Exception> error, Action<Exception> cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) {
--- a/Implab/ActionTaskT.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/ActionTaskT.cs Wed May 06 17:11:27 2015 +0300 @@ -3,7 +3,7 @@ namespace Implab { public class ActionTask<T> : ActionTaskBase, IDeferred<T> { readonly Action<T> m_task; - public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel) : base(error,cancel) { + public ActionTask(Action<T> task, Action<Exception> error, Action<Exception> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; }
--- a/Implab/FuncChainTask.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/FuncChainTask.cs Wed May 06 17:11:27 2015 +0300 @@ -4,14 +4,17 @@ public class FuncChainTask<TResult> : FuncChainTaskBase<TResult>, IDeferred { readonly Func<IPromise<TResult>> m_task; - public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){ + public FuncChainTask(Func<IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) + : base(error, cancel, autoCancellable) { m_task = task; } public void Resolve() { if (m_task != null && LockCancelation()) { try { - Observe(m_task()); + var operation = m_task(); + operation.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(operation.Cancel); } catch (Exception err) { HandleErrorInternal(err); }
--- a/Implab/FuncChainTaskBase.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/FuncChainTaskBase.cs Wed May 06 17:11:27 2015 +0300 @@ -8,9 +8,11 @@ int m_cancelationLock; - protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) { + protected FuncChainTaskBase( Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) { @@ -19,11 +21,15 @@ } public override void CancelOperation(Exception reason) { - if (m_cancel != null && LockCancelation()) { - try { - Observe(m_cancel(reason)); - } catch(Exception err) { - HandleErrorInternal(err); + if (LockCancelation()) { + if (m_cancel != null) { + try { + m_cancel(reason).On(SetResult, HandleErrorInternal, SetCancelled); + } catch (Exception err) { + HandleErrorInternal(err); + } + } else { + SetCancelled(reason); } } @@ -32,7 +38,10 @@ protected void HandleErrorInternal(Exception error) { if (m_error != null) { try { - Observe(m_error(error)); + var operation = m_error(error); + + operation.On(SetResult, SetError, SetCancelled); + CancellationRequested(operation.Cancel); } catch(Exception err) { SetError(err); } @@ -41,17 +50,6 @@ } } - protected void Observe(IPromise<TResult> operation) { - if (operation == null) - throw new NullReferenceException("The task returned null promise"); - - // pass operation results to the current promise - operation.On(SetResult, SetError, SetCancelled); - - // pass the cancelation request - CancellationRequested(operation.Cancel); - } - protected bool LockCancelation() { return 0 == Interlocked.CompareExchange(ref m_cancelationLock, 1, 0); }
--- a/Implab/FuncChainTaskT.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/FuncChainTaskT.cs Wed May 06 17:11:27 2015 +0300 @@ -4,14 +4,16 @@ public class FuncChainTask<TArg,TResult> : FuncChainTaskBase<TResult>, IDeferred<TArg> { readonly Func<TArg, IPromise<TResult>> m_task; - public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel) : base(error, cancel){ + public FuncChainTask(Func<TArg, IPromise<TResult>> task, Func<Exception, IPromise<TResult>> error, Func<Exception, IPromise<TResult>> cancel, bool autoCancellable) : base(error, cancel, autoCancellable){ m_task = task; } public void Resolve(TArg value) { if (m_task != null && LockCancelation()) { try { - Observe(m_task(value)); + var operation = m_task(value); + operation.On(SetResult, HandleErrorInternal, SetCancelled); + CancellationRequested(operation.Cancel); } catch (Exception err) { HandleErrorInternal(err); }
--- a/Implab/FuncTask.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/FuncTask.cs Wed May 06 17:11:27 2015 +0300 @@ -5,7 +5,7 @@ public class FuncTask<T> : FuncTaskBase<T>, IDeferred { readonly Func<T> m_task; - public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel) : base(error,cancel) { + public FuncTask(Func<T> task, Func<Exception, T> error, Func<Exception, T> cancel, bool autoCancellable) : base(error, cancel, autoCancellable) { m_task = task; }
--- a/Implab/FuncTaskBase.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/FuncTaskBase.cs Wed May 06 17:11:27 2015 +0300 @@ -8,9 +8,11 @@ int m_cancelationLock; - protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel) { + protected FuncTaskBase( Func<Exception, TResult> error, Func<Exception, TResult> cancel, bool autoCancellable) { m_error = error; m_cancel = cancel; + if (autoCancellable) + CancellationRequested(CancelOperation); } public void Reject(Exception error) {
--- a/Implab/FuncTaskT.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/FuncTaskT.cs Wed May 06 17:11:27 2015 +0300 @@ -4,7 +4,7 @@ public class FuncTask<TArg, TResult> : FuncTaskBase<TResult>, IDeferred<TArg> { readonly Func<TArg, TResult> m_task; - public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel) : base(error,cancel) { + public FuncTask(Func<TArg, TResult> task, Func<Exception, TResult> error,Func<Exception, TResult> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; }
--- a/Implab/Parallels/WorkerPool.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/Parallels/WorkerPool.cs Wed May 06 17:11:27 2015 +0300 @@ -30,7 +30,49 @@ InitPool(); } - public Promise<T> Invoke<T>(Func<T> task) { + public IPromise<T> Invoke<T>(Func<T> task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new FuncTask<T>(task, null, null, true); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + + promise.Resolve(); + + TraceContext.Instance.Leave(); + }); + + return promise; + } + + public IPromise Invoke(Action task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new ActionTask(task, null, null, true); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + + promise.Resolve(); + + TraceContext.Instance.Leave(); + }); + + return promise; + } + + public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) { if (task == null) throw new ArgumentNullException("task"); if (IsDisposed) @@ -43,7 +85,35 @@ EnqueueTask(delegate { TraceContext.Instance.EnterLogicalOperation(lop, false); try { - promise.Resolve(task()); + if (!promise.CancelOperationIfRequested()) + promise.Resolve(task(promise)); + } catch (Exception e) { + promise.Reject(e); + } finally { + TraceContext.Instance.Leave(); + } + }); + + return promise; + } + + public IPromise Invoke<T>(Action<ICancellationToken> task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new Promise(); + + var lop = TraceContext.Instance.CurrentOperation; + + EnqueueTask(delegate { + TraceContext.Instance.EnterLogicalOperation(lop, false); + try { + if (!promise.CancelOperationIfRequested()) { + task(promise); + promise.Resolve(); + } } catch (Exception e) { promise.Reject(e); } finally {
--- a/Implab/PromiseExtensions.cs Wed Apr 15 07:30:20 2015 +0300 +++ b/Implab/PromiseExtensions.cs Wed May 06 17:11:27 2015 +0300 @@ -178,7 +178,7 @@ public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new ActionTask(success, error, cancel); + var d = new ActionTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -196,7 +196,7 @@ public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncTask<T>(success, error, cancel); + var d = new FuncTask<T>(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -213,7 +213,7 @@ public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncTask<T,T2>(success, error, cancel); + var d = new FuncTask<T,T2>(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -232,7 +232,7 @@ public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new ActionChainTask(success, error, cancel); + var d = new ActionChainTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -250,7 +250,7 @@ public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncChainTask<T>(success, error, cancel); + var d = new FuncChainTask<T>(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel); @@ -267,7 +267,7 @@ public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) { Safe.ArgumentNotNull(that, "that"); - var d = new FuncChainTask<T,T2>(success, error, cancel); + var d = new FuncChainTask<T,T2>(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); if (success != null) d.CancellationRequested(that.Cancel);