diff Implab/Promise.cs @ 18:0c924dff5498

Слияние с promises
author cin
date Fri, 08 Nov 2013 01:27:04 +0400
parents 5a4b735ba669
children e3935fdf59a2
line wrap: on
line diff
--- a/Implab/Promise.cs	Mon Oct 21 02:34:31 2013 +0400
+++ b/Implab/Promise.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -1,18 +1,16 @@
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Reflection;
-using System.Text;
 using System.Diagnostics;
 using System.Threading;
 
 namespace Implab {
 
     public delegate void ErrorHandler(Exception e);
-
-    public delegate void ResultHandler<T>(T result);
-    public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
-    public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
+    public delegate T ErrorHandler<out T>(Exception e);
+    public delegate void ResultHandler<in T>(T result);
+    public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
+    public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
 
     /// <summary>
     /// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -48,23 +46,23 @@
     /// только инициатор обещания иначе могут возникнуть противоречия.
     /// </para>
     /// </remarks>
-    public class Promise<T> {
+    public class Promise<T> : IPromise {
 
         struct ResultHandlerInfo {
             public ResultHandler<T> resultHandler;
             public ErrorHandler errorHandler;
         }
 
-        enum State {
-            Unresolved,
-            Resolving,
-            Resolved,
-            Cancelled
-        }
+        readonly IPromise m_parent;
+
+        LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
+        LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
 
-        LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>();
-        State m_state;
-        bool m_cancellable;
+        readonly object m_lock = new Object();
+        readonly bool m_cancellable;
+        int m_childrenCount = 0;
+
+        PromiseState m_state;
         T m_result;
         Exception m_error;
 
@@ -72,13 +70,17 @@
             m_cancellable = true;
         }
 
-        /// <summary>
-        /// Событие, возникающее при отмене асинхронной операции.
-        /// </summary>
-        /// <description>
-        /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
-        /// </description>
-        public event EventHandler Cancelled;
+        public Promise(IPromise parent, bool cancellable) {
+            m_cancellable = cancellable;
+            m_parent = parent;
+            if (parent != null)
+                parent.HandleCancelled(InternalCancel);
+        }
+
+        void InternalCancel() {
+            // don't try to cancel parent :)
+            Cancel(false);
+        }
 
         /// <summary>
         /// Выполняет обещание, сообщая об успешном выполнении.
@@ -86,38 +88,39 @@
         /// <param name="result">Результат выполнения.</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Resolve(T result) {
-            lock (this) {
-                if (m_state == State.Cancelled)
+            lock (m_lock) {
+                if (m_state == PromiseState.Cancelled)
                     return;
-                if (m_state != State.Unresolved)
+                if (m_state != PromiseState.Unresolved)
                     throw new InvalidOperationException("The promise is already resolved");
                 m_result = result;
-                m_state = State.Resolving;
+                m_state = PromiseState.Resolved;
             }
 
-            ResultHandlerInfo handler;
-            while (FetchNextHandler(out handler))
-                InvokeHandler(handler);
+            OnStateChanged();
         }
 
         /// <summary>
         /// Выполняет обещание, сообщая об ошибке
         /// </summary>
+        /// <remarks>
+        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
+        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
+        /// будут проигнорированы.
+        /// </remarks>
         /// <param name="error">Исключение возникшее при выполнении операции</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Reject(Exception error) {
-            lock (this) {
-                if (m_state == State.Cancelled)
+            lock (m_lock) {
+                if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
                     return;
-                if (m_state != State.Unresolved)
+                if (m_state != PromiseState.Unresolved)
                     throw new InvalidOperationException("The promise is already resolved");
                 m_error = error;
-                m_state = State.Resolving;
+                m_state = PromiseState.Rejected;
             }
 
-            ResultHandlerInfo handler;
-            while (FetchNextHandler(out handler))
-                InvokeHandler(handler);
+            OnStateChanged();
         }
 
         /// <summary>
@@ -125,47 +128,31 @@
         /// </summary>
         /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
         public bool Cancel() {
-            lock (this) {
-                if (m_state == State.Unresolved && m_cancellable) {
-                    m_state = State.Cancelled;
-                    EventHandler temp = Cancelled;
-
-                    if (temp != null)
-                        temp(this, new EventArgs());
-
-                    return true;
-                } else
-                    return false;
-            }
+            return Cancel(true);
         }
 
         /// <summary>
-        /// Добавляет обработчики событий выполнения обещания.
+        /// Adds new handlers to this promise.
         /// </summary>
-        /// <param name="success">Обработчик успешного выполнения обещания.
-        /// Данному обработчику будет передан результат выполнения операции.</param>
-        /// <param name="error">Обработчик ошибки. Данный обработчик получит
-        /// исключение возникшее при выполнении операции.</param>
-        /// <returns>Само обещание</returns>
+        /// <param name="success">The handler of the successfully completed operation.
+        /// This handler will recieve an operation result as a parameter.</param>
+        /// <param name="error">Handles an exception that may occur during the operation.</param>
+        /// <returns>The new promise chained to this one.</returns>
         public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
             if (success == null && error == null)
                 return this;
 
-            var medium = new Promise<T>();
+            var medium = new Promise<T>(this, true);
 
             var handlerInfo = new ResultHandlerInfo();
 
             if (success != null)
                 handlerInfo.resultHandler = x => {
-                    try {
-                        success(x);
-                        medium.Resolve(x);
-                    } catch (Exception e) {
-                        medium.Reject(e);
-                    }
+                    success(x);
+                    medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = x => medium.Resolve(x);
+                handlerInfo.resultHandler = medium.Resolve;
 
             if (error != null)
                 handlerInfo.errorHandler = x => {
@@ -175,21 +162,106 @@
                     medium.Reject(x);
                 };
             else
-                handlerInfo.errorHandler = x => medium.Reject(x);
+                handlerInfo.errorHandler = medium.Reject;
+
+            AddHandler(handlerInfo);
+
+            return medium;
+        }
+
+        /// <summary>
+        /// Adds new handlers to this promise.
+        /// </summary>
+        /// <param name="success">The handler of the successfully completed operation.
+        /// This handler will recieve an operation result as a parameter.</param>
+        /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
+        /// <returns>The new promise chained to this one.</returns>
+        public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
+            if (success == null && error == null)
+                return this;
+
+            var medium = new Promise<T>(this, true);
+
+            var handlerInfo = new ResultHandlerInfo();
+
+            if (success != null)
+                handlerInfo.resultHandler = x => {
+                    success(x);
+                    medium.Resolve(x);
+                };
+            else
+                handlerInfo.resultHandler = medium.Resolve;
+
+            if (error != null)
+                handlerInfo.errorHandler = x => {
+                    try {
+                        medium.Resolve(error(x));
+                    } catch { }
+                    medium.Reject(x);
+                };
+            else
+                handlerInfo.errorHandler = medium.Reject;
 
             AddHandler(handlerInfo);
 
             return medium;
         }
 
+
         public Promise<T> Then(ResultHandler<T> success) {
-            return Then(success, null);
+            if (success == null)
+                return this;
+
+            var medium = new Promise<T>(this, true);
+
+            var handlerInfo = new ResultHandlerInfo();
+
+            if (success != null)
+                handlerInfo.resultHandler = x => {
+                    success(x);
+                    medium.Resolve(x);
+                };
+            else
+                handlerInfo.resultHandler = medium.Resolve;
+
+            handlerInfo.errorHandler = medium.Reject;
+
+            AddHandler(handlerInfo);
+
+            return medium;
         }
 
         public Promise<T> Error(ErrorHandler error) {
             return Then(null, error);
         }
 
+        /// <summary>
+        /// Handles error and allows to keep the promise.
+        /// </summary>
+        /// <remarks>
+        /// If the specified handler throws an exception, this exception will be used to reject the promise.
+        /// </remarks>
+        /// <param name="handler">The error handler which returns the result of the promise.</param>
+        /// <returns>New promise.</returns>
+        public Promise<T> Error(ErrorHandler<T> handler) {
+            if (handler == null)
+                return this;
+
+            var medium = new Promise<T>(this, true);
+
+            AddHandler(new ResultHandlerInfo {
+                errorHandler = e => {
+                    try {
+                        medium.Resolve(handler(e));
+                    } catch (Exception e2) {
+                        medium.Reject(e2);
+                    }
+                }
+            });
+
+            return medium;
+        }
+
         public Promise<T> Anyway(Action handler) {
             if (handler == null)
                 return this;
@@ -198,6 +270,7 @@
 
             AddHandler(new ResultHandlerInfo {
                 resultHandler = x => {
+                    // to avoid handler being called multiple times we handle exception by ourselfs
                     try {
                         handler();
                         medium.Resolve(x);
@@ -229,20 +302,15 @@
                 throw new ArgumentNullException("mapper");
 
             // создаем прицепленное обещание
-            Promise<TNew> chained = new Promise<TNew>();
+            var chained = new Promise<TNew>();
 
             AddHandler(new ResultHandlerInfo() {
-                resultHandler = delegate(T result) {
-                    try {
-                        // если преобразование выдаст исключение, то сработает reject сцепленного deferred
-                        chained.Resolve(mapper(result));
-                    } catch (Exception e) {
-                        chained.Reject(e);
-                    }
-                },
+                resultHandler = result => chained.Resolve(mapper(result)),
                 errorHandler = delegate(Exception e) {
                     if (error != null)
-                        error(e);
+                        try {
+                            error(e);
+                        } catch { }
                     // в случае ошибки нужно передать исключение дальше по цепочке
                     chained.Reject(e);
                 }
@@ -271,19 +339,21 @@
             // создать посредника, к которому будут подвызяваться следующие обработчики.
             // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
             // передать через него результаты работы.
-            Promise<TNew> medium = new Promise<TNew>();
+            var medium = new Promise<TNew>(this, true);
 
-            AddHandler(new ResultHandlerInfo() {
+            AddHandler(new ResultHandlerInfo {
                 resultHandler = delegate(T result) {
-                    try {
-                        chained(result).Then(
-                            x => medium.Resolve(x),
-                            e => medium.Reject(e)
-                            );
-                    } catch (Exception e) {
-                        // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
-                        medium.Reject(e);
-                    }
+                    if (medium.State == PromiseState.Cancelled)
+                        return;
+
+                    var promise = chained(result);
+
+                    // notify chained operation that it's not needed
+                    medium.Cancelled(() => promise.Cancel());
+                    promise.Then(
+                        x => medium.Resolve(x),
+                        e => medium.Reject(e)
+                    );
                 },
                 errorHandler = delegate(Exception e) {
                     if (error != null)
@@ -300,6 +370,22 @@
             return Chain(chained, null);
         }
 
+        public Promise<T> Cancelled(Action handler) {
+            if (handler == null)
+                return this;
+            lock (m_lock) {
+                if (m_state == PromiseState.Unresolved)
+                    m_cancelHandlers.AddLast(handler);
+                else if (m_state == PromiseState.Cancelled)
+                    handler();
+            }
+            return this;
+        }
+
+        public void HandleCancelled(Action handler) {
+            Cancelled(handler);
+        }
+
         /// <summary>
         /// Дожидается отложенного обещания и в случае успеха, возвращает
         /// его, результат, в противном случае бросает исключение.
@@ -322,52 +408,37 @@
         /// <param name="timeout">Время ожидания</param>
         /// <returns>Результат выполнения обещания</returns>
         public T Join(int timeout) {
-            ManualResetEvent evt = new ManualResetEvent(false);
+            var evt = new ManualResetEvent(false);
             Anyway(() => evt.Set());
+            Cancelled(() => evt.Set());
 
             if (!evt.WaitOne(timeout, true))
                 throw new TimeoutException();
 
-            if (m_error != null)
-                throw new TargetInvocationException(m_error);
-            else
-                return m_result;
+            switch (State) {
+                case PromiseState.Resolved:
+                    return m_result;
+                case PromiseState.Cancelled:
+                    throw new OperationCanceledException();
+                case PromiseState.Rejected:
+                    throw new TargetInvocationException(m_error);
+                default:
+                    throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+            }
         }
 
         public T Join() {
             return Join(Timeout.Infinite);
         }
 
-        /// <summary>
-        /// Данный метод последовательно извлекает обработчики обещания и когда
-        /// их больше не осталось - ставит состояние "разрешено".
-        /// </summary>
-        /// <param name="handler">Информация об обработчике</param>
-        /// <returns>Признак того, что еще остались обработчики в очереди</returns>
-        bool FetchNextHandler(out ResultHandlerInfo handler) {
-            handler = default(ResultHandlerInfo);
-
-            lock (this) {
-                Debug.Assert(m_state == State.Resolving);
-
-                if (m_handlersChain.Count > 0) {
-                    handler = m_handlersChain.First.Value;
-                    m_handlersChain.RemoveFirst();
-                    return true;
-                } else {
-                    m_state = State.Resolved;
-                    return false;
-                }
-            }
-        }
-
         void AddHandler(ResultHandlerInfo handler) {
             bool invokeRequired = false;
 
-            lock (this) {
-                if (m_state != State.Resolved)
-                    m_handlersChain.AddLast(handler);
-                else
+            lock (m_lock) {
+                m_childrenCount++;
+                if (m_state == PromiseState.Unresolved) {
+                    m_resultHandlers.AddLast(handler);
+                } else
                     invokeRequired = true;
             }
 
@@ -377,21 +448,102 @@
         }
 
         void InvokeHandler(ResultHandlerInfo handler) {
-            if (m_error == null) {
-                try {
-                    if (handler.resultHandler != null)
-                        handler.resultHandler(m_result);
-                } catch { }
-            }
-
-            if (m_error != null) {
-                try {
-                    if (handler.errorHandler != null)
-                        handler.errorHandler(m_error);
-                } catch { }
+            switch (m_state) {
+                case PromiseState.Resolved:
+                    try {
+                        if (handler.resultHandler != null)
+                            handler.resultHandler(m_result);
+                    } catch (Exception e) {
+                        try {
+                            if (handler.errorHandler != null)
+                                handler.errorHandler(e);
+                        } catch { }
+                    }
+                    break;
+                case PromiseState.Rejected:
+                    try {
+                        if (handler.errorHandler != null)
+                            handler.errorHandler(m_error);
+                    } catch { }
+                    break;
+                default:
+                    // do nothing
+                    return;
             }
         }
 
+        protected virtual void OnStateChanged() {
+            switch (m_state) {
+                case PromiseState.Resolved:
+                    foreach (var resultHandlerInfo in m_resultHandlers)
+                        try {
+                            if (resultHandlerInfo.resultHandler != null)
+                                resultHandlerInfo.resultHandler(m_result);
+                        } catch (Exception e) {
+                            try {
+                                if (resultHandlerInfo.errorHandler != null)
+                                    resultHandlerInfo.errorHandler(e);
+                            } catch { }
+                        }
+                    break;
+                case PromiseState.Cancelled:
+                    foreach (var cancelHandler in m_cancelHandlers)
+                        cancelHandler();
+                    break;
+                case PromiseState.Rejected:
+                    foreach (var resultHandlerInfo in m_resultHandlers)
+                        try {
+                            if (resultHandlerInfo.errorHandler != null)
+                                resultHandlerInfo.errorHandler(m_error);
+                        } catch { }
+                    break;
+                default:
+                    throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
+            }
+
+            m_resultHandlers = null;
+            m_cancelHandlers = null;
+        }
+
+
+
+        public bool IsExclusive {
+            get {
+                lock (m_lock) {
+                    return m_childrenCount <= 1;
+                }
+            }
+        }
+
+        public PromiseState State {
+            get {
+                lock (m_lock) {
+                    return m_state;
+                }
+            }
+        }
+
+        protected bool Cancel(bool dependencies) {
+            bool result;
+
+            lock (m_lock) {
+                if (m_state == PromiseState.Unresolved) {
+                    m_state = PromiseState.Cancelled;
+                    result = true;
+                } else {
+                    result = false;
+                }
+            }
+
+            if (result)
+                OnStateChanged();
+
+            if (dependencies && m_parent != null && m_parent.IsExclusive) {
+                m_parent.Cancel();
+            }
+
+            return result;
+        }
 
     }
 }