view Implab/Promise.cs @ 262:f1696cdc3d7a v3 v3.0.8

Added IInitializable.Initialize() overload Added IRunnable.Start(), IRunnable.Start() overloads Fixed cancellation of the current operation when Stop() is called More tests
author cin
date Mon, 16 Apr 2018 02:12:39 +0300
parents 547a2fc0d93e
children
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Implab.Parallels;

namespace Implab {
    public class Promise : AbstractEvent<IResolvable>, IPromise {
        public static IDispatcher DefaultDispatcher {
            get {
                return ThreadPoolDispatcher.Instance;
            }
        }

        class ResolvableSignal : IResolvable {
            public Signal Signal { get; private set; }
            public ResolvableSignal() {
                Signal = new Signal();
            }


            public void Reject(Exception error) {
                Signal.Set();
            }

            public void Resolve() {
                Signal.Set();
            }
        }

        PromiseState m_state;

        Exception m_error;

        public bool IsRejected {
            get {
                return m_state == PromiseState.Rejected;
            }
        }

        public bool IsFulfilled {
            get {
                return m_state == PromiseState.Fulfilled;
            }
        }

        public Exception RejectReason {
            get {
                return m_error;
            }
        }

        internal Promise() {

        }

        internal void ResolvePromise() {
            if (BeginTransit()) {
                m_state = PromiseState.Fulfilled;
                CompleteTransit();
            }
        }

        internal void RejectPromise(Exception reason) {
            if (BeginTransit()) {
                m_error = reason;
                m_state = PromiseState.Rejected;
                CompleteTransit();
            }
        }


        #region implemented abstract members of AbstractPromise

        protected override void SignalHandler(IResolvable handler) {
            switch (m_state) {
                case PromiseState.Fulfilled:
                    handler.Resolve();
                    break;
                case PromiseState.Rejected:
                    handler.Reject(RejectReason);
                    break;
                default:
                    throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state));
            }
        }

        protected void WaitResult(int timeout) {
            if (!(IsResolved || GetFulfillSignal().Wait(timeout)))
                throw new TimeoutException();
        }

        protected Signal GetFulfillSignal() {
            var next = new ResolvableSignal();
            Then(next);
            return next.Signal;
        }

        #endregion


        public Type ResultType {
            get {
                return typeof(void);
            }
        }


        protected void Rethrow() {
            Debug.Assert(m_error != null);
            if (m_error is OperationCanceledException)
                throw new OperationCanceledException("Operation cancelled", m_error);
            else
                throw new TargetInvocationException(m_error);
        }

        public void Then(IResolvable next) {
            AddHandler(next);
        }

        public IPromise<T> Cast<T>() {
            throw new InvalidCastException();
        }

        public void Join() {
            WaitResult(-1);
            if (IsRejected)
                Rethrow();
        }

        public void Join(int timeout) {
            WaitResult(timeout);
            if (IsRejected)
                Rethrow();
        }

        public static ResolvedPromise Resolve() {
            return new ResolvedPromise();
        }

        public static ResolvedPromise<T> Resolve<T>(T result) {
            return new ResolvedPromise<T>(result);
        }

        public static RejectedPromise Reject(Exception reason) {
            return new RejectedPromise(reason);
        }

        public static RejectedPromise<T> Reject<T>(Exception reason) {
            return new RejectedPromise<T>(reason);
        }

        public static IPromise Create(PromiseExecutor executor) {
            return Create(executor, CancellationToken.None);
        }

        public static IPromise Create(PromiseExecutor executor, CancellationToken ct) {
            Safe.ArgumentNotNull(executor, nameof(executor));
            if (!ct.CanBeCanceled)
                return Create(executor);
            
            var d = new Deferred();
            
            ct.Register(d.Cancel);
            
            try {
                if (!ct.IsCancellationRequested)
                    executor(d);
            } catch(Exception e) {
                d.Reject(e);
            }
            return d.Promise;
        }

        public static IPromise<T> Create<T>(PromiseExecutor<T> executor) {
            return Create(executor, CancellationToken.None);
        }

        public static IPromise<T> Create<T>(PromiseExecutor<T> executor, CancellationToken ct) {
            Safe.ArgumentNotNull(executor, nameof(executor));

            var d = new Deferred<T>();
            
            ct.Register(d.Cancel);
            
            try {
                if (!ct.IsCancellationRequested)
                    executor(d);
            } catch(Exception e) {
                d.Reject(e);
            }
            return d.Promise;
        }

        public static IPromise All(IEnumerable<IPromise> promises) {
            var d = new Deferred();
            var all = new PromiseAll(d);
            foreach (var promise in promises) {
                all.AddPromise(promise);
                if (all.Done)
                    break;
            }
            all.Complete();
            return all.ResultPromise;
        }

        public static IPromise<T[]> All<T>(IEnumerable<IPromise<T>> promises, Func<T, IPromise> cleanup = null, Action cancel = null) {
            var d = new Deferred<T[]>();
            var all = new PromiseAll<T>(d, cleanup, cancel);
            foreach (var promise in promises) {
                all.AddPromise(promise);
                if (all.Done)
                    break;
            }
            all.Complete();
            return all.ResultPromise;
        }
    }
}