Mercurial > pub > ImplabNet
diff Implab/Promise.cs @ 18:0c924dff5498
Слияние с promises
author | cin |
---|---|
date | Fri, 08 Nov 2013 01:27:04 +0400 |
parents | 5a4b735ba669 |
children | e3935fdf59a2 |
line wrap: on
line diff
--- a/Implab/Promise.cs Mon Oct 21 02:34:31 2013 +0400 +++ b/Implab/Promise.cs Fri Nov 08 01:27:04 2013 +0400 @@ -1,18 +1,16 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Text; using System.Diagnostics; using System.Threading; namespace Implab { public delegate void ErrorHandler(Exception e); - - public delegate void ResultHandler<T>(T result); - public delegate TNew ResultMapper<TSrc, TNew>(TSrc result); - public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result); + public delegate T ErrorHandler<out T>(Exception e); + public delegate void ResultHandler<in T>(T result); + public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result); + public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result); /// <summary> /// Класс для асинхронного получения результатов. Так называемое "обещание". @@ -48,23 +46,23 @@ /// только инициатор обещания иначе могут возникнуть противоречия. /// </para> /// </remarks> - public class Promise<T> { + public class Promise<T> : IPromise { struct ResultHandlerInfo { public ResultHandler<T> resultHandler; public ErrorHandler errorHandler; } - enum State { - Unresolved, - Resolving, - Resolved, - Cancelled - } + readonly IPromise m_parent; + + LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>(); + LinkedList<Action> m_cancelHandlers = new LinkedList<Action>(); - LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>(); - State m_state; - bool m_cancellable; + readonly object m_lock = new Object(); + readonly bool m_cancellable; + int m_childrenCount = 0; + + PromiseState m_state; T m_result; Exception m_error; @@ -72,13 +70,17 @@ m_cancellable = true; } - /// <summary> - /// Событие, возникающее при отмене асинхронной операции. - /// </summary> - /// <description> - /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. - /// </description> - public event EventHandler Cancelled; + public Promise(IPromise parent, bool cancellable) { + m_cancellable = cancellable; + m_parent = parent; + if (parent != null) + parent.HandleCancelled(InternalCancel); + } + + void InternalCancel() { + // don't try to cancel parent :) + Cancel(false); + } /// <summary> /// Выполняет обещание, сообщая об успешном выполнении. @@ -86,38 +88,39 @@ /// <param name="result">Результат выполнения.</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> public void Resolve(T result) { - lock (this) { - if (m_state == State.Cancelled) + lock (m_lock) { + if (m_state == PromiseState.Cancelled) return; - if (m_state != State.Unresolved) + if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved"); m_result = result; - m_state = State.Resolving; + m_state = PromiseState.Resolved; } - ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); + OnStateChanged(); } /// <summary> /// Выполняет обещание, сообщая об ошибке /// </summary> + /// <remarks> + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// </remarks> /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> public void Reject(Exception error) { - lock (this) { - if (m_state == State.Cancelled) + lock (m_lock) { + if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) return; - if (m_state != State.Unresolved) + if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved"); m_error = error; - m_state = State.Resolving; + m_state = PromiseState.Rejected; } - ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); + OnStateChanged(); } /// <summary> @@ -125,47 +128,31 @@ /// </summary> /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns> public bool Cancel() { - lock (this) { - if (m_state == State.Unresolved && m_cancellable) { - m_state = State.Cancelled; - EventHandler temp = Cancelled; - - if (temp != null) - temp(this, new EventArgs()); - - return true; - } else - return false; - } + return Cancel(true); } /// <summary> - /// Добавляет обработчики событий выполнения обещания. + /// Adds new handlers to this promise. /// </summary> - /// <param name="success">Обработчик успешного выполнения обещания. - /// Данному обработчику будет передан результат выполнения операции.</param> - /// <param name="error">Обработчик ошибки. Данный обработчик получит - /// исключение возникшее при выполнении операции.</param> - /// <returns>Само обещание</returns> + /// <param name="success">The handler of the successfully completed operation. + /// This handler will recieve an operation result as a parameter.</param> + /// <param name="error">Handles an exception that may occur during the operation.</param> + /// <returns>The new promise chained to this one.</returns> public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) { if (success == null && error == null) return this; - var medium = new Promise<T>(); + var medium = new Promise<T>(this, true); var handlerInfo = new ResultHandlerInfo(); if (success != null) handlerInfo.resultHandler = x => { - try { - success(x); - medium.Resolve(x); - } catch (Exception e) { - medium.Reject(e); - } + success(x); + medium.Resolve(x); }; else - handlerInfo.resultHandler = x => medium.Resolve(x); + handlerInfo.resultHandler = medium.Resolve; if (error != null) handlerInfo.errorHandler = x => { @@ -175,21 +162,106 @@ medium.Reject(x); }; else - handlerInfo.errorHandler = x => medium.Reject(x); + handlerInfo.errorHandler = medium.Reject; + + AddHandler(handlerInfo); + + return medium; + } + + /// <summary> + /// Adds new handlers to this promise. + /// </summary> + /// <param name="success">The handler of the successfully completed operation. + /// This handler will recieve an operation result as a parameter.</param> + /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param> + /// <returns>The new promise chained to this one.</returns> + public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) { + if (success == null && error == null) + return this; + + var medium = new Promise<T>(this, true); + + var handlerInfo = new ResultHandlerInfo(); + + if (success != null) + handlerInfo.resultHandler = x => { + success(x); + medium.Resolve(x); + }; + else + handlerInfo.resultHandler = medium.Resolve; + + if (error != null) + handlerInfo.errorHandler = x => { + try { + medium.Resolve(error(x)); + } catch { } + medium.Reject(x); + }; + else + handlerInfo.errorHandler = medium.Reject; AddHandler(handlerInfo); return medium; } + public Promise<T> Then(ResultHandler<T> success) { - return Then(success, null); + if (success == null) + return this; + + var medium = new Promise<T>(this, true); + + var handlerInfo = new ResultHandlerInfo(); + + if (success != null) + handlerInfo.resultHandler = x => { + success(x); + medium.Resolve(x); + }; + else + handlerInfo.resultHandler = medium.Resolve; + + handlerInfo.errorHandler = medium.Reject; + + AddHandler(handlerInfo); + + return medium; } public Promise<T> Error(ErrorHandler error) { return Then(null, error); } + /// <summary> + /// Handles error and allows to keep the promise. + /// </summary> + /// <remarks> + /// If the specified handler throws an exception, this exception will be used to reject the promise. + /// </remarks> + /// <param name="handler">The error handler which returns the result of the promise.</param> + /// <returns>New promise.</returns> + public Promise<T> Error(ErrorHandler<T> handler) { + if (handler == null) + return this; + + var medium = new Promise<T>(this, true); + + AddHandler(new ResultHandlerInfo { + errorHandler = e => { + try { + medium.Resolve(handler(e)); + } catch (Exception e2) { + medium.Reject(e2); + } + } + }); + + return medium; + } + public Promise<T> Anyway(Action handler) { if (handler == null) return this; @@ -198,6 +270,7 @@ AddHandler(new ResultHandlerInfo { resultHandler = x => { + // to avoid handler being called multiple times we handle exception by ourselfs try { handler(); medium.Resolve(x); @@ -229,20 +302,15 @@ throw new ArgumentNullException("mapper"); // создаем прицепленное обещание - Promise<TNew> chained = new Promise<TNew>(); + var chained = new Promise<TNew>(); AddHandler(new ResultHandlerInfo() { - resultHandler = delegate(T result) { - try { - // если преобразование выдаст исключение, то сработает reject сцепленного deferred - chained.Resolve(mapper(result)); - } catch (Exception e) { - chained.Reject(e); - } - }, + resultHandler = result => chained.Resolve(mapper(result)), errorHandler = delegate(Exception e) { if (error != null) - error(e); + try { + error(e); + } catch { } // в случае ошибки нужно передать исключение дальше по цепочке chained.Reject(e); } @@ -271,19 +339,21 @@ // создать посредника, к которому будут подвызяваться следующие обработчики. // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы // передать через него результаты работы. - Promise<TNew> medium = new Promise<TNew>(); + var medium = new Promise<TNew>(this, true); - AddHandler(new ResultHandlerInfo() { + AddHandler(new ResultHandlerInfo { resultHandler = delegate(T result) { - try { - chained(result).Then( - x => medium.Resolve(x), - e => medium.Reject(e) - ); - } catch (Exception e) { - // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке - medium.Reject(e); - } + if (medium.State == PromiseState.Cancelled) + return; + + var promise = chained(result); + + // notify chained operation that it's not needed + medium.Cancelled(() => promise.Cancel()); + promise.Then( + x => medium.Resolve(x), + e => medium.Reject(e) + ); }, errorHandler = delegate(Exception e) { if (error != null) @@ -300,6 +370,22 @@ return Chain(chained, null); } + public Promise<T> Cancelled(Action handler) { + if (handler == null) + return this; + lock (m_lock) { + if (m_state == PromiseState.Unresolved) + m_cancelHandlers.AddLast(handler); + else if (m_state == PromiseState.Cancelled) + handler(); + } + return this; + } + + public void HandleCancelled(Action handler) { + Cancelled(handler); + } + /// <summary> /// Дожидается отложенного обещания и в случае успеха, возвращает /// его, результат, в противном случае бросает исключение. @@ -322,52 +408,37 @@ /// <param name="timeout">Время ожидания</param> /// <returns>Результат выполнения обещания</returns> public T Join(int timeout) { - ManualResetEvent evt = new ManualResetEvent(false); + var evt = new ManualResetEvent(false); Anyway(() => evt.Set()); + Cancelled(() => evt.Set()); if (!evt.WaitOne(timeout, true)) throw new TimeoutException(); - if (m_error != null) - throw new TargetInvocationException(m_error); - else - return m_result; + switch (State) { + case PromiseState.Resolved: + return m_result; + case PromiseState.Cancelled: + throw new OperationCanceledException(); + case PromiseState.Rejected: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", State)); + } } public T Join() { return Join(Timeout.Infinite); } - /// <summary> - /// Данный метод последовательно извлекает обработчики обещания и когда - /// их больше не осталось - ставит состояние "разрешено". - /// </summary> - /// <param name="handler">Информация об обработчике</param> - /// <returns>Признак того, что еще остались обработчики в очереди</returns> - bool FetchNextHandler(out ResultHandlerInfo handler) { - handler = default(ResultHandlerInfo); - - lock (this) { - Debug.Assert(m_state == State.Resolving); - - if (m_handlersChain.Count > 0) { - handler = m_handlersChain.First.Value; - m_handlersChain.RemoveFirst(); - return true; - } else { - m_state = State.Resolved; - return false; - } - } - } - void AddHandler(ResultHandlerInfo handler) { bool invokeRequired = false; - lock (this) { - if (m_state != State.Resolved) - m_handlersChain.AddLast(handler); - else + lock (m_lock) { + m_childrenCount++; + if (m_state == PromiseState.Unresolved) { + m_resultHandlers.AddLast(handler); + } else invokeRequired = true; } @@ -377,21 +448,102 @@ } void InvokeHandler(ResultHandlerInfo handler) { - if (m_error == null) { - try { - if (handler.resultHandler != null) - handler.resultHandler(m_result); - } catch { } - } - - if (m_error != null) { - try { - if (handler.errorHandler != null) - handler.errorHandler(m_error); - } catch { } + switch (m_state) { + case PromiseState.Resolved: + try { + if (handler.resultHandler != null) + handler.resultHandler(m_result); + } catch (Exception e) { + try { + if (handler.errorHandler != null) + handler.errorHandler(e); + } catch { } + } + break; + case PromiseState.Rejected: + try { + if (handler.errorHandler != null) + handler.errorHandler(m_error); + } catch { } + break; + default: + // do nothing + return; } } + protected virtual void OnStateChanged() { + switch (m_state) { + case PromiseState.Resolved: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.resultHandler != null) + resultHandlerInfo.resultHandler(m_result); + } catch (Exception e) { + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(e); + } catch { } + } + break; + case PromiseState.Cancelled: + foreach (var cancelHandler in m_cancelHandlers) + cancelHandler(); + break; + case PromiseState.Rejected: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(m_error); + } catch { } + break; + default: + throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state)); + } + + m_resultHandlers = null; + m_cancelHandlers = null; + } + + + + public bool IsExclusive { + get { + lock (m_lock) { + return m_childrenCount <= 1; + } + } + } + + public PromiseState State { + get { + lock (m_lock) { + return m_state; + } + } + } + + protected bool Cancel(bool dependencies) { + bool result; + + lock (m_lock) { + if (m_state == PromiseState.Unresolved) { + m_state = PromiseState.Cancelled; + result = true; + } else { + result = false; + } + } + + if (result) + OnStateChanged(); + + if (dependencies && m_parent != null && m_parent.IsExclusive) { + m_parent.Cancel(); + } + + return result; + } } }