Mercurial > pub > ImplabNet
diff Implab/Promise.cs @ 10:aa33d0bb8c0c promises
implemeted new cancellable promises concept
author | cin |
---|---|
date | Sun, 03 Nov 2013 18:07:38 +0400 |
parents | c82e0dfbb4dd |
children | 6ec82bf68c8e |
line wrap: on
line diff
--- a/Implab/Promise.cs Sat Nov 02 00:55:47 2013 +0400 +++ b/Implab/Promise.cs Sun Nov 03 18:07:38 2013 +0400 @@ -1,8 +1,6 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Text; using System.Diagnostics; using System.Threading; @@ -10,9 +8,9 @@ public delegate void ErrorHandler(Exception e); - public delegate void ResultHandler<T>(T result); - public delegate TNew ResultMapper<TSrc, TNew>(TSrc result); - public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result); + public delegate void ResultHandler<in T>(T result); + public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result); + public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result); /// <summary> /// Класс для асинхронного получения результатов. Так называемое "обещание". @@ -55,20 +53,19 @@ public ErrorHandler errorHandler; } - IPromise m_parent; + readonly IPromise m_parent; LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>(); LinkedList<Action> m_cancelHandlers = new LinkedList<Action>(); - object m_lock = new Object(); - bool m_cancellable; + readonly object m_lock = new Object(); + readonly bool m_cancellable; + int m_childrenCount = 0; PromiseState m_state; T m_result; Exception m_error; - int m_childrenCount; - public Promise() { m_cancellable = true; } @@ -76,15 +73,14 @@ public Promise(IPromise parent, bool cancellable) { m_cancellable = cancellable; m_parent = parent; + if (parent != null) + parent.HandleCancelled(InternalCancel); } - /// <summary> - /// Событие, возникающее при отмене асинхронной операции. - /// </summary> - /// <description> - /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. - /// </description> - public event EventHandler Cancelled; + void InternalCancel() { + // don't try to cancel parent :) + Cancel(false); + } /// <summary> /// Выполняет обещание, сообщая об успешном выполнении. @@ -101,14 +97,7 @@ m_state = PromiseState.Resolved; } - // state has been changed to rejected new handlers can't be added - - foreach (var handler in m_resultHandlers) - InvokeHandler(handler); - - /* ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); */ + OnStateChanged(); } /// <summary> @@ -126,14 +115,7 @@ m_state = PromiseState.Rejected; } - // state has been changed to rejected new handlers can't be added - - foreach (var handler in m_resultHandlers) - InvokeHandler(handler); - - /*ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler);*/ + OnStateChanged(); } /// <summary> @@ -144,6 +126,39 @@ return Cancel(true); } + protected virtual void OnStateChanged() { + switch (m_state) { + case PromiseState.Resolved: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.resultHandler != null) + resultHandlerInfo.resultHandler(m_result); + } catch (Exception e) { + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(e); + } catch { } + } + break; + case PromiseState.Cancelled: + foreach (var cancelHandler in m_cancelHandlers) + cancelHandler(); + break; + case PromiseState.Rejected: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(m_error); + } catch { } + break; + default: + throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state)); + } + + m_resultHandlers = null; + m_cancelHandlers = null; + } + /// <summary> /// Добавляет обработчики событий выполнения обещания. /// </summary> @@ -162,15 +177,11 @@ if (success != null) handlerInfo.resultHandler = x => { - try { - success(x); - medium.Resolve(x); - } catch (Exception e) { - medium.Reject(e); - } + success(x); + medium.Resolve(x); }; else - handlerInfo.resultHandler = x => medium.Resolve(x); + handlerInfo.resultHandler = medium.Resolve; if (error != null) handlerInfo.errorHandler = x => { @@ -180,7 +191,7 @@ medium.Reject(x); }; else - handlerInfo.errorHandler = x => medium.Reject(x); + handlerInfo.errorHandler = medium.Reject; AddHandler(handlerInfo); @@ -203,6 +214,7 @@ AddHandler(new ResultHandlerInfo { resultHandler = x => { + // to avoid handler being called multiple times we handle exception by ourselfs try { handler(); medium.Resolve(x); @@ -234,20 +246,15 @@ throw new ArgumentNullException("mapper"); // создаем прицепленное обещание - Promise<TNew> chained = new Promise<TNew>(); + var chained = new Promise<TNew>(); AddHandler(new ResultHandlerInfo() { - resultHandler = delegate(T result) { - try { - // если преобразование выдаст исключение, то сработает reject сцепленного deferred - chained.Resolve(mapper(result)); - } catch (Exception e) { - chained.Reject(e); - } - }, + resultHandler = result => chained.Resolve(mapper(result)), errorHandler = delegate(Exception e) { if (error != null) - error(e); + try { + error(e); + } catch { } // в случае ошибки нужно передать исключение дальше по цепочке chained.Reject(e); } @@ -276,19 +283,21 @@ // создать посредника, к которому будут подвызяваться следующие обработчики. // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы // передать через него результаты работы. - Promise<TNew> medium = new Promise<TNew>(); + var medium = new Promise<TNew>(this, true); - AddHandler(new ResultHandlerInfo() { + AddHandler(new ResultHandlerInfo { resultHandler = delegate(T result) { - try { - chained(result).Then( - x => medium.Resolve(x), - e => medium.Reject(e) - ); - } catch (Exception e) { - // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке - medium.Reject(e); - } + if (medium.State == PromiseState.Cancelled) + return; + + var promise = chained(result); + + // notify chained operation that it's not needed + medium.Cancelled(() => promise.Cancel()); + promise.Then( + medium.Resolve, + medium.Reject + ); }, errorHandler = delegate(Exception e) { if (error != null) @@ -305,6 +314,22 @@ return Chain(chained, null); } + public Promise<T> Cancelled(Action handler) { + if (handler == null) + return this; + lock (m_lock) { + if (m_state == PromiseState.Unresolved) + m_cancelHandlers.AddLast(handler); + else if (m_state == PromiseState.Cancelled) + handler(); + } + return this; + } + + public void HandleCancelled(Action handler) { + Cancelled(handler); + } + /// <summary> /// Дожидается отложенного обещания и в случае успеха, возвращает /// его, результат, в противном случае бросает исключение. @@ -327,51 +352,37 @@ /// <param name="timeout">Время ожидания</param> /// <returns>Результат выполнения обещания</returns> public T Join(int timeout) { - ManualResetEvent evt = new ManualResetEvent(false); + var evt = new ManualResetEvent(false); Anyway(() => evt.Set()); + Cancelled(() => evt.Set()); if (!evt.WaitOne(timeout, true)) throw new TimeoutException(); - if (m_error != null) - throw new TargetInvocationException(m_error); - else - return m_result; + switch (State) { + case PromiseState.Resolved: + return m_result; + case PromiseState.Cancelled: + throw new OperationCanceledException(); + case PromiseState.Rejected: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", State)); + } } public T Join() { return Join(Timeout.Infinite); } - /// <summary> - /// Данный метод последовательно извлекает обработчики обещания и когда - /// их больше не осталось - ставит состояние "разрешено". - /// </summary> - /// <param name="handler">Информация об обработчике</param> - /// <returns>Признак того, что еще остались обработчики в очереди</returns> - bool FetchNextHandler(out ResultHandlerInfo handler) { - handler = default(ResultHandlerInfo); - - lock (this) { - Debug.Assert(m_state != PromiseState.Unresolved); - - if (m_resultHandlers.Count > 0) { - handler = m_resultHandlers.First.Value; - m_resultHandlers.RemoveFirst(); - return true; - } else { - return false; - } - } - } - void AddHandler(ResultHandlerInfo handler) { bool invokeRequired = false; - lock (this) { - if (m_state == PromiseState.Unresolved) + lock (m_lock) { + m_childrenCount++; + if (m_state == PromiseState.Unresolved) { m_resultHandlers.AddLast(handler); - else + } else invokeRequired = true; } @@ -381,18 +392,27 @@ } void InvokeHandler(ResultHandlerInfo handler) { - if (m_error == null) { - try { - if (handler.resultHandler != null) - handler.resultHandler(m_result); - } catch { } - } - - if (m_error != null) { - try { - if (handler.errorHandler != null) - handler.errorHandler(m_error); - } catch { } + switch (m_state) { + case PromiseState.Resolved: + try { + if (handler.resultHandler != null) + handler.resultHandler(m_result); + } catch (Exception e) { + try { + if (handler.errorHandler != null) + handler.errorHandler(e); + } catch { } + } + break; + case PromiseState.Rejected: + try { + if (handler.errorHandler != null) + handler.errorHandler(m_error); + } catch { } + break; + default: + // do nothing + return; } } @@ -426,15 +446,11 @@ } } - if (dependencies && m_parent != null && m_parent.IsExclusive) { - // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive) - m_parent.Cancel(true); - } + if (result) + OnStateChanged(); - if (result) { - // state has been changed to cancelled, new handlers can't be added - foreach (var handler in m_cancelHandlers) - handler(); + if (dependencies && m_parent != null && m_parent.IsExclusive) { + m_parent.Cancel(true); } return result;