Mercurial > pub > ImplabNet
view Implab/Promise.cs @ 262:f1696cdc3d7a v3 v3.0.8
Added IInitializable.Initialize() overload
Added IRunnable.Start(), IRunnable.Start() overloads
Fixed cancellation of the current operation when Stop() is called
More tests
author | cin |
---|---|
date | Mon, 16 Apr 2018 02:12:39 +0300 |
parents | 547a2fc0d93e |
children |
line wrap: on
line source
using System; using System.Collections.Generic; using System.Diagnostics; using System.Reflection; using System.Threading; using System.Threading.Tasks; using Implab.Parallels; namespace Implab { public class Promise : AbstractEvent<IResolvable>, IPromise { public static IDispatcher DefaultDispatcher { get { return ThreadPoolDispatcher.Instance; } } class ResolvableSignal : IResolvable { public Signal Signal { get; private set; } public ResolvableSignal() { Signal = new Signal(); } public void Reject(Exception error) { Signal.Set(); } public void Resolve() { Signal.Set(); } } PromiseState m_state; Exception m_error; public bool IsRejected { get { return m_state == PromiseState.Rejected; } } public bool IsFulfilled { get { return m_state == PromiseState.Fulfilled; } } public Exception RejectReason { get { return m_error; } } internal Promise() { } internal void ResolvePromise() { if (BeginTransit()) { m_state = PromiseState.Fulfilled; CompleteTransit(); } } internal void RejectPromise(Exception reason) { if (BeginTransit()) { m_error = reason; m_state = PromiseState.Rejected; CompleteTransit(); } } #region implemented abstract members of AbstractPromise protected override void SignalHandler(IResolvable handler) { switch (m_state) { case PromiseState.Fulfilled: handler.Resolve(); break; case PromiseState.Rejected: handler.Reject(RejectReason); break; default: throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); } } protected void WaitResult(int timeout) { if (!(IsResolved || GetFulfillSignal().Wait(timeout))) throw new TimeoutException(); } protected Signal GetFulfillSignal() { var next = new ResolvableSignal(); Then(next); return next.Signal; } #endregion public Type ResultType { get { return typeof(void); } } protected void Rethrow() { Debug.Assert(m_error != null); if (m_error is OperationCanceledException) throw new OperationCanceledException("Operation cancelled", m_error); else throw new TargetInvocationException(m_error); } public void Then(IResolvable next) { AddHandler(next); } public IPromise<T> Cast<T>() { throw new InvalidCastException(); } public void Join() { WaitResult(-1); if (IsRejected) Rethrow(); } public void Join(int timeout) { WaitResult(timeout); if (IsRejected) Rethrow(); } public static ResolvedPromise Resolve() { return new ResolvedPromise(); } public static ResolvedPromise<T> Resolve<T>(T result) { return new ResolvedPromise<T>(result); } public static RejectedPromise Reject(Exception reason) { return new RejectedPromise(reason); } public static RejectedPromise<T> Reject<T>(Exception reason) { return new RejectedPromise<T>(reason); } public static IPromise Create(PromiseExecutor executor) { return Create(executor, CancellationToken.None); } public static IPromise Create(PromiseExecutor executor, CancellationToken ct) { Safe.ArgumentNotNull(executor, nameof(executor)); if (!ct.CanBeCanceled) return Create(executor); var d = new Deferred(); ct.Register(d.Cancel); try { if (!ct.IsCancellationRequested) executor(d); } catch(Exception e) { d.Reject(e); } return d.Promise; } public static IPromise<T> Create<T>(PromiseExecutor<T> executor) { return Create(executor, CancellationToken.None); } public static IPromise<T> Create<T>(PromiseExecutor<T> executor, CancellationToken ct) { Safe.ArgumentNotNull(executor, nameof(executor)); var d = new Deferred<T>(); ct.Register(d.Cancel); try { if (!ct.IsCancellationRequested) executor(d); } catch(Exception e) { d.Reject(e); } return d.Promise; } public static IPromise All(IEnumerable<IPromise> promises) { var d = new Deferred(); var all = new PromiseAll(d); foreach (var promise in promises) { all.AddPromise(promise); if (all.Done) break; } all.Complete(); return all.ResultPromise; } public static IPromise<T[]> All<T>(IEnumerable<IPromise<T>> promises, Func<T, IPromise> cleanup = null, Action cancel = null) { var d = new Deferred<T[]>(); var all = new PromiseAll<T>(d, cleanup, cancel); foreach (var promise in promises) { all.AddPromise(promise); if (all.Done) break; } all.Complete(); return all.ResultPromise; } } }