Mercurial > pub > ImplabNet
diff Implab/Promise.cs @ 248:5cb4826c2c2a v3
Added awaiters to promises
Added static methods to Promise Resolve, Reject, All.
Updated promise helpers
author | cin |
---|---|
date | Tue, 30 Jan 2018 01:37:17 +0300 |
parents | |
children | d82909310094 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Promise.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,209 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Reflection; +using System.Threading.Tasks; +using Implab.Parallels; + +namespace Implab { + public class Promise : AbstractEvent<IResolvable>, IPromise { + public static IDispatcher DefaultDispatcher { + get { + return ThreadPoolDispatcher.Instance; + } + } + + class ResolvableSignal : IResolvable { + public Signal Signal { get; private set; } + public ResolvableSignal() { + Signal = new Signal(); + } + + + public void Reject(Exception error) { + Signal.Set(); + } + + public void Resolve() { + Signal.Set(); + } + } + + PromiseState m_state; + + Exception m_error; + + public bool IsRejected { + get { + return m_state == PromiseState.Rejected; + } + } + + public bool IsFulfilled { + get { + return m_state == PromiseState.Fulfilled; + } + } + + public Exception RejectReason { + get { + return m_error; + } + } + + internal Promise() { + + } + + internal void ResolvePromise() { + if (BeginTransit()) { + m_state = PromiseState.Fulfilled; + CompleteTransit(); + } + } + + internal void RejectPromise(Exception reason) { + if (BeginTransit()) { + m_error = reason; + m_state = PromiseState.Rejected; + CompleteTransit(); + } + } + + + #region implemented abstract members of AbstractPromise + + protected override void SignalHandler(IResolvable handler) { + switch (m_state) { + case PromiseState.Fulfilled: + handler.Resolve(); + break; + case PromiseState.Rejected: + handler.Reject(RejectReason); + break; + default: + throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); + } + } + + protected void WaitResult(int timeout) { + if (!(IsResolved || GetFulfillSignal().Wait(timeout))) + throw new TimeoutException(); + } + + protected Signal GetFulfillSignal() { + var next = new ResolvableSignal(); + Then(next); + return next.Signal; + } + + #endregion + + + public Type ResultType { + get { + return typeof(void); + } + } + + + protected void Rethrow() { + Debug.Assert(m_error != null); + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); + } + + public void Then(IResolvable next) { + AddHandler(next); + } + + public IPromise<T> Cast<T>() { + throw new InvalidCastException(); + } + + public void Join() { + WaitResult(-1); + if (IsRejected) + Rethrow(); + } + + public void Join(int timeout) { + WaitResult(timeout); + if (IsRejected) + Rethrow(); + } + + public static ResolvedPromise Resolve() { + return new ResolvedPromise(); + } + + public static ResolvedPromise<T> Resolve<T>(T result) { + return new ResolvedPromise<T>(result); + } + + public static RejectedPromise Reject(Exception reason) { + return new RejectedPromise(reason); + } + + public static RejectedPromise<T> Reject<T>(Exception reason) { + return new RejectedPromise<T>(reason); + } + + public static IPromise Create(PromiseExecutor executor) { + Safe.ArgumentNotNull(executor, nameof(executor)); + + var p = new Promise(); + var d = new Deferred(p, DefaultDispatcher); + + try { + executor(d); + } catch (Exception e) { + d.Reject(e); + } + + return d.Promise; + } + + public static IPromise<T> Create<T>(PromiseExecutor<T> executor) { + Safe.ArgumentNotNull(executor, nameof(executor)); + + var p = new Promise<T>(); + var d = new Deferred<T>(p, DefaultDispatcher); + + try { + executor(d); + } catch (Exception e) { + d.Reject(e); + } + + return d.Promise; + } + + public static IPromise All(IEnumerable<IPromise> promises) { + var d = new Deferred(DefaultDispatcher); + var all = new PromiseAll(d); + foreach (var promise in promises) { + all.AddPromise(promise); + if (all.Done) + break; + } + all.Complete(); + return all.ResultPromise; + } + + public static IPromise<T[]> All<T>(IEnumerable<IPromise<T>> promises, Func<T, IPromise> cleanup, Action cancel) { + var d = new Deferred<T[]>(DefaultDispatcher); + var all = new PromiseAll<T>(d, cleanup, cancel); + foreach (var promise in promises) { + all.AddPromise(promise); + if (all.Done) + break; + } + all.Complete(); + return all.ResultPromise; + } + } +} +