Mercurial > pub > ImplabNet
changeset 10:aa33d0bb8c0c promises
implemeted new cancellable promises concept
author | cin |
---|---|
date | Sun, 03 Nov 2013 18:07:38 +0400 (2013-11-03) |
parents | c82e0dfbb4dd |
children | 6ec82bf68c8e |
files | .hgignore Implab.Test/AsyncTests.cs Implab.Test/Implab.Test.csproj Implab.Test/PromiseHelper.cs Implab.suo Implab/IPromise.cs Implab/Promise.cs Implab/TaskController.cs |
diffstat | 8 files changed, 185 insertions(+), 119 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Sat Nov 02 00:55:47 2013 +0400 +++ b/.hgignore Sun Nov 03 18:07:38 2013 +0400 @@ -10,3 +10,4 @@ Implab.Fx/bin/ Implab.Fx.Test/bin/ Implab.Fx.Test/obj/ +_ReSharper.Implab/
--- a/Implab.Test/AsyncTests.cs Sat Nov 02 00:55:47 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Sun Nov 03 18:07:38 2013 +0400 @@ -1,10 +1,9 @@ using System; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Implab; using System.Reflection; using System.Threading; -namespace Implab.Tests +namespace Implab.Test { [TestClass] public class AsyncTests @@ -90,12 +89,39 @@ public void PoolTest () { var pid = Thread.CurrentThread.ManagedThreadId; - var p = AsyncPool.Invoke (() => { - return Thread.CurrentThread.ManagedThreadId; - }); + var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId); Assert.AreNotEqual (pid, p.Join ()); } + + [TestMethod] + public void ComplexCase1Test() { + var flags = new bool[3]; + + // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) + + var p = PromiseHelper + .Sleep(200, "Alan") + .Cancelled(() => flags[0] = true) + .Chain(x => + PromiseHelper + .Sleep(200, "Hi, " + x) + .Map( y => y ) + .Cancelled(() => flags[1] = true) + ) + .Cancelled(() => flags[2] = true); + Thread.Sleep(300); + p.Cancel(); + try { + Assert.AreEqual(p.Join(), "Hi, Alan"); + Assert.Fail("Shouldn't get here"); + } catch(OperationCanceledException) { + } + + Assert.IsFalse(flags[0]); + Assert.IsTrue(flags[1]); + Assert.IsTrue(flags[2]); + } } }
--- a/Implab.Test/Implab.Test.csproj Sat Nov 02 00:55:47 2013 +0400 +++ b/Implab.Test/Implab.Test.csproj Sun Nov 03 18:07:38 2013 +0400 @@ -46,6 +46,7 @@ </ItemGroup> <ItemGroup> <Compile Include="AsyncTests.cs" /> + <Compile Include="PromiseHelper.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab.Test/PromiseHelper.cs Sun Nov 03 18:07:38 2013 +0400 @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Test { + class PromiseHelper { + public static Promise<T> Sleep<T>(int timeout, T retVal) { + return AsyncPool.Invoke(() => { + Thread.Sleep(timeout); + return retVal; + }); + } + } +}
--- a/Implab/IPromise.cs Sat Nov 02 00:55:47 2013 +0400 +++ b/Implab/IPromise.cs Sun Nov 03 18:07:38 2013 +0400 @@ -29,5 +29,12 @@ /// <param name="dependencies">Try to cancel the whole promise chain, the parent promise will be cancelled only if it has only one promise</param> /// <returns></returns> bool Cancel(bool dependencies); + + /// <summary> + /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the + /// handler will be invoked immediatelly. + /// </summary> + /// <param name="handler">The handler</param> + void HandleCancelled(Action handler); } }
--- a/Implab/Promise.cs Sat Nov 02 00:55:47 2013 +0400 +++ b/Implab/Promise.cs Sun Nov 03 18:07:38 2013 +0400 @@ -1,8 +1,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Text; using System.Diagnostics; using System.Threading; @@ -10,9 +8,9 @@ public delegate void ErrorHandler(Exception e); - public delegate void ResultHandler<T>(T result); - public delegate TNew ResultMapper<TSrc, TNew>(TSrc result); - public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result); + public delegate void ResultHandler<in T>(T result); + public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result); + public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result); /// <summary> /// Класс для асинхронного получения результатов. Так называемое "обещание". @@ -55,20 +53,19 @@ public ErrorHandler errorHandler; } - IPromise m_parent; + readonly IPromise m_parent; LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>(); LinkedList<Action> m_cancelHandlers = new LinkedList<Action>(); - object m_lock = new Object(); - bool m_cancellable; + readonly object m_lock = new Object(); + readonly bool m_cancellable; + int m_childrenCount = 0; PromiseState m_state; T m_result; Exception m_error; - int m_childrenCount; - public Promise() { m_cancellable = true; } @@ -76,15 +73,14 @@ public Promise(IPromise parent, bool cancellable) { m_cancellable = cancellable; m_parent = parent; + if (parent != null) + parent.HandleCancelled(InternalCancel); } - /// <summary> - /// Событие, возникающее при отмене асинхронной операции. - /// </summary> - /// <description> - /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. - /// </description> - public event EventHandler Cancelled; + void InternalCancel() { + // don't try to cancel parent :) + Cancel(false); + } /// <summary> /// Выполняет обещание, сообщая об успешном выполнении. @@ -101,14 +97,7 @@ m_state = PromiseState.Resolved; } - // state has been changed to rejected new handlers can't be added - - foreach (var handler in m_resultHandlers) - InvokeHandler(handler); - - /* ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); */ + OnStateChanged(); } /// <summary> @@ -126,14 +115,7 @@ m_state = PromiseState.Rejected; } - // state has been changed to rejected new handlers can't be added - - foreach (var handler in m_resultHandlers) - InvokeHandler(handler); - - /*ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler);*/ + OnStateChanged(); } /// <summary> @@ -144,6 +126,39 @@ return Cancel(true); } + protected virtual void OnStateChanged() { + switch (m_state) { + case PromiseState.Resolved: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.resultHandler != null) + resultHandlerInfo.resultHandler(m_result); + } catch (Exception e) { + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(e); + } catch { } + } + break; + case PromiseState.Cancelled: + foreach (var cancelHandler in m_cancelHandlers) + cancelHandler(); + break; + case PromiseState.Rejected: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(m_error); + } catch { } + break; + default: + throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state)); + } + + m_resultHandlers = null; + m_cancelHandlers = null; + } + /// <summary> /// Добавляет обработчики событий выполнения обещания. /// </summary> @@ -162,15 +177,11 @@ if (success != null) handlerInfo.resultHandler = x => { - try { - success(x); - medium.Resolve(x); - } catch (Exception e) { - medium.Reject(e); - } + success(x); + medium.Resolve(x); }; else - handlerInfo.resultHandler = x => medium.Resolve(x); + handlerInfo.resultHandler = medium.Resolve; if (error != null) handlerInfo.errorHandler = x => { @@ -180,7 +191,7 @@ medium.Reject(x); }; else - handlerInfo.errorHandler = x => medium.Reject(x); + handlerInfo.errorHandler = medium.Reject; AddHandler(handlerInfo); @@ -203,6 +214,7 @@ AddHandler(new ResultHandlerInfo { resultHandler = x => { + // to avoid handler being called multiple times we handle exception by ourselfs try { handler(); medium.Resolve(x); @@ -234,20 +246,15 @@ throw new ArgumentNullException("mapper"); // создаем прицепленное обещание - Promise<TNew> chained = new Promise<TNew>(); + var chained = new Promise<TNew>(); AddHandler(new ResultHandlerInfo() { - resultHandler = delegate(T result) { - try { - // если преобразование выдаст исключение, то сработает reject сцепленного deferred - chained.Resolve(mapper(result)); - } catch (Exception e) { - chained.Reject(e); - } - }, + resultHandler = result => chained.Resolve(mapper(result)), errorHandler = delegate(Exception e) { if (error != null) - error(e); + try { + error(e); + } catch { } // в случае ошибки нужно передать исключение дальше по цепочке chained.Reject(e); } @@ -276,19 +283,21 @@ // создать посредника, к которому будут подвызяваться следующие обработчики. // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы // передать через него результаты работы. - Promise<TNew> medium = new Promise<TNew>(); + var medium = new Promise<TNew>(this, true); - AddHandler(new ResultHandlerInfo() { + AddHandler(new ResultHandlerInfo { resultHandler = delegate(T result) { - try { - chained(result).Then( - x => medium.Resolve(x), - e => medium.Reject(e) - ); - } catch (Exception e) { - // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке - medium.Reject(e); - } + if (medium.State == PromiseState.Cancelled) + return; + + var promise = chained(result); + + // notify chained operation that it's not needed + medium.Cancelled(() => promise.Cancel()); + promise.Then( + medium.Resolve, + medium.Reject + ); }, errorHandler = delegate(Exception e) { if (error != null) @@ -305,6 +314,22 @@ return Chain(chained, null); } + public Promise<T> Cancelled(Action handler) { + if (handler == null) + return this; + lock (m_lock) { + if (m_state == PromiseState.Unresolved) + m_cancelHandlers.AddLast(handler); + else if (m_state == PromiseState.Cancelled) + handler(); + } + return this; + } + + public void HandleCancelled(Action handler) { + Cancelled(handler); + } + /// <summary> /// Дожидается отложенного обещания и в случае успеха, возвращает /// его, результат, в противном случае бросает исключение. @@ -327,51 +352,37 @@ /// <param name="timeout">Время ожидания</param> /// <returns>Результат выполнения обещания</returns> public T Join(int timeout) { - ManualResetEvent evt = new ManualResetEvent(false); + var evt = new ManualResetEvent(false); Anyway(() => evt.Set()); + Cancelled(() => evt.Set()); if (!evt.WaitOne(timeout, true)) throw new TimeoutException(); - if (m_error != null) - throw new TargetInvocationException(m_error); - else - return m_result; + switch (State) { + case PromiseState.Resolved: + return m_result; + case PromiseState.Cancelled: + throw new OperationCanceledException(); + case PromiseState.Rejected: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", State)); + } } public T Join() { return Join(Timeout.Infinite); } - /// <summary> - /// Данный метод последовательно извлекает обработчики обещания и когда - /// их больше не осталось - ставит состояние "разрешено". - /// </summary> - /// <param name="handler">Информация об обработчике</param> - /// <returns>Признак того, что еще остались обработчики в очереди</returns> - bool FetchNextHandler(out ResultHandlerInfo handler) { - handler = default(ResultHandlerInfo); - - lock (this) { - Debug.Assert(m_state != PromiseState.Unresolved); - - if (m_resultHandlers.Count > 0) { - handler = m_resultHandlers.First.Value; - m_resultHandlers.RemoveFirst(); - return true; - } else { - return false; - } - } - } - void AddHandler(ResultHandlerInfo handler) { bool invokeRequired = false; - lock (this) { - if (m_state == PromiseState.Unresolved) + lock (m_lock) { + m_childrenCount++; + if (m_state == PromiseState.Unresolved) { m_resultHandlers.AddLast(handler); - else + } else invokeRequired = true; } @@ -381,18 +392,27 @@ } void InvokeHandler(ResultHandlerInfo handler) { - if (m_error == null) { - try { - if (handler.resultHandler != null) - handler.resultHandler(m_result); - } catch { } - } - - if (m_error != null) { - try { - if (handler.errorHandler != null) - handler.errorHandler(m_error); - } catch { } + switch (m_state) { + case PromiseState.Resolved: + try { + if (handler.resultHandler != null) + handler.resultHandler(m_result); + } catch (Exception e) { + try { + if (handler.errorHandler != null) + handler.errorHandler(e); + } catch { } + } + break; + case PromiseState.Rejected: + try { + if (handler.errorHandler != null) + handler.errorHandler(m_error); + } catch { } + break; + default: + // do nothing + return; } } @@ -426,15 +446,11 @@ } } - if (dependencies && m_parent != null && m_parent.IsExclusive) { - // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive) - m_parent.Cancel(true); - } + if (result) + OnStateChanged(); - if (result) { - // state has been changed to cancelled, new handlers can't be added - foreach (var handler in m_cancelHandlers) - handler(); + if (dependencies && m_parent != null && m_parent.IsExclusive) { + m_parent.Cancel(true); } return result;