view Implab/Promise.cs @ 33:b255e4aeef17

removed the reference to the parent from the promise object this allows resolved promises to release parents and results they are holding. Added complete set of operations to IPromiseBase interface Subscribing to the cancellation event of the promise should not affect it's IsExclusive property More tests.
author cin
date Thu, 10 Apr 2014 02:39:29 +0400
parents 8eca2652d2ff
children 653c4e04968b
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Reflection;
using System.Diagnostics;
using System.Threading;
using Implab.Parallels;

namespace Implab {

    public delegate void ErrorHandler(Exception e);
    public delegate T ErrorHandler<out T>(Exception e);
    public delegate void ResultHandler<in T>(T result);
    public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
    public delegate IPromise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);

    /// <summary>
    /// Класс для асинхронного получения результатов. Так называемое "обещание".
    /// </summary>
    /// <typeparam name="T">Тип получаемого результата</typeparam>
    /// <remarks>
    /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции,
    /// клиент получив такое обещание может установить ряд обратных вызово для получения
    /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para>
    /// <para>
    /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на
    /// данные события клиент должен использовать методы <c>Then</c>.
    /// </para>
    /// <para>
    /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой),
    /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о
    /// выполнении обещания.
    /// </para>
    /// <para>
    /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
    /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
    /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
    /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
    /// обещания.
    /// </para>
    /// <para>
    /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
    /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
    /// использовать соответствующую форму методе <c>Then</c>.
    /// </para>
    /// <para>
    /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
    /// только инициатор обещания иначе могут возникнуть противоречия.
    /// </para>
    /// </remarks>
    public class Promise<T> : IPromise<T> {

        protected struct HandlerDescriptor {
            public ResultHandler<T> resultHandler;
            public ErrorHandler errorHandler;
            public Action cancellHandler;

            public void Resolve(T result) {
                if (resultHandler != null)
                    try {
                        resultHandler(result);
                    } catch (Exception e) {
                        Reject(e);
                    }
            }

            public void Reject(Exception err) {
                if (errorHandler != null)
                    try {
                        errorHandler(err);
                    } catch {
                    }
            }

            public void Cancel() {
                if (cancellHandler != null)
                    try {
                        cancellHandler();
                    } catch {
                    }
            }
        }

        const int UnresolvedSate = 0;
        const int TransitionalState = 1;
        const int SucceededState = 2;
        const int RejectedState = 3;
        const int CancelledState = 4;

        readonly bool m_cancellable;

        int m_childrenCount = 0;
        int m_state;
        T m_result;
        Exception m_error;

        readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();

        public Promise() {
            m_cancellable = true;
        }

        public Promise(IPromiseBase parent, bool cancellable) {
            m_cancellable = cancellable;
            if (parent != null)
                AddHandler(
                    null,
                    null,
                    () => {
                        if (parent.IsExclusive)
                            parent.Cancel();
                    }
                );
        }

        bool BeginTransit() {
            return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
        }

        void CompleteTransit(int state) {
            if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
        }

        void WaitTransition() {
            while (m_state == TransitionalState) {
                /* noop */
            }
        }

        public bool IsResolved {
            get {
                return m_state > 1;
            }
        }

        public bool IsCancelled {
            get {
                return m_state == CancelledState;
            }
        }

        public Type PromiseType {
            get { return typeof(T); }
        }

        /// <summary>
        /// Выполняет обещание, сообщая об успешном выполнении.
        /// </summary>
        /// <param name="result">Результат выполнения.</param>
        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
        public void Resolve(T result) {
            if (BeginTransit()) {
                m_result = result;
                CompleteTransit(SucceededState);
                OnStateChanged();
            } else {
                WaitTransition();
                if (m_state != CancelledState)
                    throw new InvalidOperationException("The promise is already resolved");
            }
        }

        /// <summary>
        /// Выполняет обещание, сообщая об успешном выполнении. Результатом выполнения будет пустое значения.
        /// </summary>
        /// <remarks>
        /// Данный вариант удобен в случаях, когда интересен факт выполнения операции, нежели полученное значение.
        /// </remarks>
        public void Resolve() {
            Resolve(default(T));
        }

        /// <summary>
        /// Выполняет обещание, сообщая об ошибке
        /// </summary>
        /// <remarks>
        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
        /// будут проигнорированы.
        /// </remarks>
        /// <param name="error">Исключение возникшее при выполнении операции</param>
        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
        public void Reject(Exception error) {
            if (BeginTransit()) {
                m_error = error;
                CompleteTransit(RejectedState);
                OnStateChanged();
            } else {
                WaitTransition();
                if (m_state == SucceededState)
                    throw new InvalidOperationException("The promise is already resolved");
            }
        }

        /// <summary>
        /// Отменяет операцию, если это возможно.
        /// </summary>
        /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
        public bool Cancel() {
            if (BeginTransit()) {
                CompleteTransit(CancelledState);
                OnStateChanged();
                return true;
            } else {
                return false;
            }
        }

        // сделано для возвращаемого типа void
        protected void InternalCancel() {
            Cancel();
        }

        /// <summary>
        /// Adds new handlers to this promise.
        /// </summary>
        /// <param name="success">The handler of the successfully completed operation.
        /// This handler will recieve an operation result as a parameter.</param>
        /// <param name="error">Handles an exception that may occur during the operation.</param>
        /// <returns>The new promise chained to this one.</returns>
        public IPromise<T> Then(ResultHandler<T> success, ErrorHandler error) {
            if (success == null && error == null)
                return this;

            var medium = new Promise<T>(this, true);

            ResultHandler<T> resultHandler;
            if (success != null)
                resultHandler = x => {
                    success(x);
                    medium.Resolve(x);
                };
            else
                resultHandler = medium.Resolve;

            ErrorHandler errorHandler;
            if (error != null)
                errorHandler = x => {
                    // несмотря на то, что обработчик ошибки вызывается безопасно,
                    // т.е. возникшие в нем ошибки будут подавлены, нам нужно
                    // гарантировать, что ошибка будет передана дальше по цепочке обещаний
                    try {
                        error(x);
                    } catch { }
                    medium.Reject(x);
                };
            else
                errorHandler = medium.Reject;

            AddHandler(resultHandler, errorHandler, medium.InternalCancel);

            return medium;
        }

        public IPromiseBase Then(Action success, ErrorHandler error) {
            return Then(x => success(), error);
        }

        public IPromiseBase Then(Action success) {
            return Then(x => success());
        }

        /// <summary>
        /// Adds new handlers to this promise.
        /// </summary>
        /// <param name="success">The handler of the successfully completed operation.
        /// This handler will recieve an operation result as a parameter.</param>
        /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
        /// <returns>The new promise chained to this one.</returns>
        public IPromise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
            if (success == null && error == null)
                return this;

            var medium = new Promise<T>(this, true);

            ResultHandler<T> resultHandler;
            ErrorHandler errorHandler;

            if (success != null)
                resultHandler = x => {
                    success(x);
                    medium.Resolve(x);
                };
            else
                resultHandler = medium.Resolve;

            if (error != null)
                errorHandler = x => {
                    try {
                        medium.Resolve(error(x));
                    } catch (Exception e) {
                        medium.Reject(e);
                    }
                };
            else
                errorHandler = medium.Reject;

            AddHandler(resultHandler, errorHandler, medium.InternalCancel);

            return medium;
        }


        public IPromise<T> Then(ResultHandler<T> success) {
            if (success == null)
                return this;

            var medium = new Promise<T>(this, true);

            ResultHandler<T> resultHandler;

            if (success != null)
                resultHandler = x => {
                    success(x);
                    medium.Resolve(x);
                };
            else
                resultHandler = medium.Resolve;

            AddHandler(resultHandler, medium.Reject, medium.InternalCancel);

            return medium;
        }

        public IPromise<T> Error(ErrorHandler error) {
            return Then((ResultHandler<T>)null, error);
        }

        /// <summary>
        /// Handles error and allows to keep the promise.
        /// </summary>
        /// <remarks>
        /// If the specified handler throws an exception, this exception will be used to reject the promise.
        /// </remarks>
        /// <param name="handler">The error handler which returns the result of the promise.</param>
        /// <returns>New promise.</returns>
        public IPromise<T> Error(ErrorHandler<T> handler) {
            if (handler == null)
                return this;

            var medium = new Promise<T>(this, true);

            AddHandler(
                x => medium.Resolve(x),
                e => {
                    try {
                        medium.Resolve(handler(e));
                    } catch (Exception e2) {
                        medium.Reject(e2);
                    }
                },
                medium.InternalCancel
            );

            return medium;
        }

        public IPromise<T> Anyway(Action handler) {
            if (handler == null)
                return this;

            var medium = new Promise<T>(this,true);

            AddHandler(
                x => {
                    // to avoid handler being called multiple times we handle exception by ourselfs
                    try {
                        handler();
                        medium.Resolve(x);
                    } catch (Exception e) {
                        medium.Reject(e);
                    }
                },

                e => {
                    try {
                        handler();
                    } catch { }
                    medium.Reject(e);
                },

                medium.InternalCancel
            );

            return medium;
        }

        /// <summary>
        /// Позволяет преобразовать результат выполения операции к новому типу.
        /// </summary>
        /// <typeparam name="TNew">Новый тип результата.</typeparam>
        /// <param name="mapper">Преобразование результата к новому типу.</param>
        /// <param name="error">Обработчик ошибки. Данный обработчик получит
        /// исключение возникшее при выполнении операции.</param>
        /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
        public IPromise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
            if (mapper == null)
                throw new ArgumentNullException("mapper");

            // создаем прицепленное обещание
            var chained = new Promise<TNew>(this,true);

            ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
            ErrorHandler errorHandler = delegate(Exception e) {
                if (error != null)
                    try {
                        error(e);
                    } catch { }
                // в случае ошибки нужно передать исключение дальше по цепочке
                chained.Reject(e);
            };


            AddHandler(
                resultHandler,
                errorHandler,
                chained.InternalCancel
            );

            return chained;
        }

        public IPromise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
            return Map(mapper, null);
        }

        /// <summary>
        /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
        /// выполнения текущей, а результат текущей операции может быть использован для инициализации
        /// новой операции.
        /// </summary>
        /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
        /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
        /// <param name="error">Обработчик ошибки. Данный обработчик получит
        /// исключение возникшее при выполнении текуещй операции.</param>
        /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
        public IPromise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler error) {

            // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
            // создать посредника, к которому будут подвызяваться следующие обработчики.
            // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
            // передать через него результаты работы.
            var medium = new Promise<TNew>(this, true);

            ResultHandler<T> resultHandler = delegate(T result) {
                if (medium.IsCancelled)
                    return;

                var promise = chained(result);

                promise.Then(
                    x => medium.Resolve(x),
                    e => medium.Reject(e)
                );
                
                // notify chained operation that it's not needed anymore
                // порядок вызова Then, Cancelled важен, поскольку от этого
                // зависит IsExclusive
                medium.Cancelled(() => {
                    if(promise.IsExclusive)
                        promise.Cancel();
                });

                // внешняя отмена связанной операции рассматривается как ошибка
                promise.Cancelled(() => medium.Reject(new OperationCanceledException()));
            };

            ErrorHandler errorHandler = delegate(Exception e) {
                if (error != null)
                    error(e);
                // в случае ошибки нужно передать исключение дальше по цепочке
                medium.Reject(e);
            };

            AddHandler(
                resultHandler,
                errorHandler,
                medium.InternalCancel
            );

            return medium;
        }

        public IPromise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
            return Chain(chained, null);
        }

        public IPromise<T> Cancelled(Action handler) {
            AddHandler(null, null, handler);
            return this;
        }

        /// <summary>
        /// Adds the specified handler for all cases (success, error, cancel)
        /// </summary>
        /// <param name="handler">The handler that will be called anyway</param>
        /// <returns>self</returns>
        public IPromise<T> Finally(Action handler) {
            if (handler == null)
                throw new ArgumentNullException("handler");
            AddHandler(
                x => handler(),
                e => handler(),
                handler
            );
            return this;
        }

        /// <summary>
        /// Преобразует результат обещания к нужному типу
        /// </summary>
        /// <typeparam name="T2"></typeparam>
        /// <returns></returns>
        public IPromise<T2> Cast<T2>() {
            return Map(x => (T2)(object)x, null);
        }

        /// <summary>
        /// Дожидается отложенного обещания и в случае успеха, возвращает
        /// его, результат, в противном случае бросает исключение.
        /// </summary>
        /// <remarks>
        /// <para>
        /// Если ожидание обещания было прервано по таймауту, это не значит,
        /// что обещание было отменено или что-то в этом роде, это только
        /// означает, что мы его не дождались, однако все зарегистрированные
        /// обработчики, как были так остались и они будут вызваны, когда
        /// обещание будет выполнено.
        /// </para>
        /// <para>
        /// Такое поведение вполне оправдано поскольку таймаут может истечь
        /// в тот момент, когда началась обработка цепочки обработчиков, и
        /// к тому же текущее обещание может стоять в цепочке обещаний и его
        /// отклонение может привести к непрогнозируемому результату.
        /// </para>
        /// </remarks>
        /// <param name="timeout">Время ожидания</param>
        /// <returns>Результат выполнения обещания</returns>
        public T Join(int timeout) {
            var evt = new ManualResetEvent(false);
            Anyway(() => evt.Set());
            Cancelled(() => evt.Set());

            if (!evt.WaitOne(timeout, true))
                throw new TimeoutException();

            switch (m_state) {
                case SucceededState:
                    return m_result;
                case CancelledState:
                    throw new OperationCanceledException();
                case RejectedState:
                    throw new TargetInvocationException(m_error);
                default:
                    throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
            }
        }

        public T Join() {
            return Join(Timeout.Infinite);
        }

        void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
            if (success != null || error != null)
                Interlocked.Increment(ref m_childrenCount);

            HandlerDescriptor handler = new HandlerDescriptor {
                resultHandler = success,
                errorHandler = error,
                cancellHandler = cancel
            };

            bool queued;

            if (!IsResolved) {
                m_handlers.Enqueue(handler);
                queued = true;
            } else {
                // the promise is in resolved state, just invoke the handled with minimum overhead
                queued = false;
                InvokeHandler(handler);
            }

            if (queued && IsResolved && m_handlers.TryDequeue(out handler))
                // if the promise have been resolved while we was adding handler to the queue
                // we can't guarantee that someone is still processing it
                // therefore we will fetch a handler from the queue and execute it
                // note that fetched handler may be not the one that we have added
                // even we can fetch no handlers at all :)
                InvokeHandler(handler);
        }

        protected virtual void InvokeHandler(HandlerDescriptor handler) {
            switch (m_state) {
                case SucceededState:
                    handler.Resolve(m_result);
                    break;
                case RejectedState:
                    handler.Reject(m_error);
                    break;
                case CancelledState:
                    handler.Cancel();
                    break;
                default:
                    // do nothing
                    return;
            }
        }

        protected virtual void OnStateChanged() {
            HandlerDescriptor handler;
            while (m_handlers.TryDequeue(out handler))
                InvokeHandler(handler);
        }

        public bool IsExclusive {
            get {
                return m_childrenCount <= 1;
            }
        }

        /// <summary>
        /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний.
        /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено.
        /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан.
        /// </summary>
        /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param>
        /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception>
        public static IPromise<T[]> CreateComposite(IList<IPromise<T>> promises) {
            if (promises == null)
                throw new ArgumentNullException();

            // создаем аккумулятор для результатов и результирующее обещание
            var result = new T[promises.Count];
            var promise = new Promise<T[]>();

            // special case
            if (promises.Count == 0) {
                promise.Resolve(result);
                return promise;
            }

            int pending = promises.Count;

            for (int i = 0; i < promises.Count; i++) {
                var dest = i;

                if (promises[i] != null) {
                    promises[i].Then(
                        x => {
                            result[dest] = x;
                            if (Interlocked.Decrement(ref pending) == 0)
                                promise.Resolve(result);
                        },
                        e => promise.Reject(e)
                    );
                } else {
                    if (Interlocked.Decrement(ref pending) == 0)
                        promise.Resolve(result);
                }
            }

            promise.Cancelled(
                () => {
                    foreach (var d in promises)
                        if (d != null && d.IsExclusive)
                            d.Cancel();
                }
            );

            return promise;
        }

        /// <summary>
        /// Объединяет несколько обещаний в одно. Результирующее обещание будет выполнено при
        /// выполнении всех указанных обещаний. При этом возвращаемые значения первичных обещаний
        /// игнорируются.
        /// </summary>
        /// <param name="promises">Коллекция первичных обещаний, которые будут объеденены в одно.</param>
        /// <returns>Новое обещание, объединяющее в себе переданные.</returns>
        /// <remarks>
        /// Если в коллекции встречаюься <c>null</c>, то они воспринимаются как выполненные обещания.
        /// </remarks>
        public static IPromiseBase CreateComposite(ICollection<IPromiseBase> promises) {
            if (promises == null)
                throw new ArgumentNullException();
            if (promises.Count == 0)
                return Promise<object>.ResultToPromise(null);

            int countdown = promises.Count;

            var result = new Promise<object>();

            foreach (var d in promises) {
                if (d == null) {
                    if (Interlocked.Decrement(ref countdown) == 0)
                        result.Resolve(null);
                } else {
                    d.Then(() => {
                        if (Interlocked.Decrement(ref countdown) == 0)
                            result.Resolve(null);
                    });
                }
            }

            result.Cancelled(() => {
                foreach (var d in promises)
                    if (d != null && d.IsExclusive)
                        d.Cancel();
            });

            return result;
        }

        public static Promise<T> ResultToPromise(T result) {
            var p = new Promise<T>();
            p.Resolve(result);
            return p;
        }

        public static Promise<T> ExceptionToPromise(Exception error) {
            if (error == null)
                throw new ArgumentNullException();

            var p = new Promise<T>();
            p.Reject(error);
            return p;
        }

        #region IPromiseBase explicit implementation

        IPromiseBase IPromiseBase.Error(ErrorHandler error) {
            return Error(error);
        }

        IPromiseBase IPromiseBase.Anyway(Action handler) {
            return Anyway(handler);
        }

        IPromiseBase IPromiseBase.Finally(Action handler) {
            return Finally(handler);
        }

        IPromiseBase IPromiseBase.Cancelled(Action handler) {
            return Cancelled(handler);
        }

        void IPromiseBase.Join() {
            Join();
        }

        void IPromiseBase.Join(int timeout) {
            Join(timeout);
        }

        #endregion


        
    }
}