Mercurial > pub > ImplabNet
changeset 248:5cb4826c2c2a v3
Added awaiters to promises
Added static methods to Promise Resolve, Reject, All.
Updated promise helpers
line wrap: on
line diff
--- a/Implab/AbstractEvent.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/AbstractEvent.cs Tue Jan 30 01:37:17 2018 +0300 @@ -69,7 +69,7 @@ protected void WaitTransition() { if (m_state == TransitionalState) { - SpinWait spin; + SpinWait spin = new SpinWait(); do { spin.SpinOnce(); } while (m_state == TransitionalState); @@ -87,17 +87,6 @@ #endregion - protected abstract Signal GetFulfillSignal(); - - #region synchronization traits - protected void WaitResult(int timeout) { - if (!(IsResolved || GetFulfillSignal().Wait(timeout))) - throw new TimeoutException(); - } - - - #endregion - #region handlers managment protected void AddHandler(THandler handler) {
--- a/Implab/AbstractPromise.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,126 +0,0 @@ -using System; -using System.Diagnostics; -using System.Reflection; -using Implab.Parallels; - -namespace Implab { - public class AbstractPromise : AbstractEvent<IResolvable>, IPromise { - - class ResolvableSignal : IResolvable { - public Signal Signal { get; private set; } - public ResolvableSignal() { - Signal = new Signal(); - } - - - public void Reject(Exception error) { - Signal.Set(); - } - - public void Resolve() { - Signal.Set(); - } - } - - PromiseState m_state; - - Exception m_error; - - public bool IsRejected { - get { - return m_state == PromiseState.Rejected; - } - } - - public bool IsFulfilled { - get { - return m_state == PromiseState.Fulfilled; - } - } - - public Exception RejectReason { - get { - return m_error; - } - } - - - internal void Resolve() { - if (BeginTransit()) - CompleteResolve(); - } - - internal void Reject(Exception reason) { - if (BeginTransit()) { - m_error = reason; - m_state = PromiseState.Rejected; - CompleteTransit(); - } - } - - - #region implemented abstract members of AbstractPromise - - protected override void SignalHandler(IResolvable handler) { - switch (m_state) { - case PromiseState.Fulfilled: - handler.Resolve(); - break; - case PromiseState.Rejected: - handler.Reject(RejectReason); - break; - default: - throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); - } - } - - protected override Signal GetFulfillSignal() { - var next = new ResolvableSignal(); - Then(next); - return next.Signal; - } - - #endregion - - protected void CompleteResolve() { - m_state = PromiseState.Fulfilled; - CompleteTransit(); - } - - public Type ResultType { - get { - return typeof(void); - } - } - - - protected void Rethrow() { - Debug.Assert(m_error != null); - if (m_error is OperationCanceledException) - throw new OperationCanceledException("Operation cancelled", m_error); - else - throw new TargetInvocationException(m_error); - } - - public void Then(IResolvable next) { - AddHandler(next); - } - - public IPromise<T> Cast<T>() { - throw new InvalidCastException(); - } - - public void Join() { - WaitResult(-1); - if (IsRejected) - Rethrow(); - } - - public void Join(int timeout) { - WaitResult(timeout); - if (IsRejected) - Rethrow(); - } - } -} -
--- a/Implab/AbstractPromise`1.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,161 +0,0 @@ -using System; -using System.Diagnostics; -using System.Reflection; -using Implab.Parallels; - -namespace Implab { - public class AbstractPromise<T> : AbstractEvent<IResolvable<T>>, IPromise<T> { - - class ResolvableSignal : IResolvable<T> { - public Signal Signal { get; private set; } - public ResolvableSignal() { - Signal = new Signal(); - } - - - public void Reject(Exception error) { - Signal.Set(); - } - - public void Resolve(T result) { - Signal.Set(); - } - } - - class ResolvableWrapper : IResolvable<T> { - readonly IResolvable m_resolvable; - public ResolvableWrapper(IResolvable resolvable) { - - } - - public void Reject(Exception reason) { - m_resolvable.Reject(reason); - } - - public void Resolve(T value) { - m_resolvable.Resolve(); - } - } - - PromiseState m_state; - - T m_result; - - Exception m_error; - - public bool IsRejected { - get { - return m_state == PromiseState.Rejected; - } - } - - public bool IsFulfilled { - get { - return m_state == PromiseState.Fulfilled; - } - } - - public Exception RejectReason { - get { - return m_error; - } - } - - - internal void Resolve(T result) { - if (BeginTransit()) - CompleteResolve(); - } - - internal void Reject(Exception reason) { - if (BeginTransit()) { - m_error = reason; - m_state = PromiseState.Rejected; - CompleteTransit(); - } - } - - - #region implemented abstract members of AbstractPromise - - protected override void SignalHandler(IResolvable<T> handler) { - switch (m_state) { - case PromiseState.Fulfilled: - handler.Resolve(m_result); - break; - case PromiseState.Rejected: - handler.Reject(RejectReason); - break; - default: - throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); - } - } - - protected override Signal GetFulfillSignal() { - var next = new ResolvableSignal(); - Then(next); - return next.Signal; - } - - #endregion - - protected void CompleteResolve() { - m_state = PromiseState.Fulfilled; - CompleteTransit(); - } - - public Type ResultType { - get { - return typeof(void); - } - } - - - protected void Rethrow() { - Debug.Assert(m_error != null); - if (m_error is OperationCanceledException) - throw new OperationCanceledException("Operation cancelled", m_error); - else - throw new TargetInvocationException(m_error); - } - - public void Then(IResolvable<T> next) { - AddHandler(next); - } - - public void Then(IResolvable next) { - AddHandler(new ResolvableWrapper(next)); - } - - public IPromise<T2> Cast<T2>() { - return (IPromise<T2>)this; - } - - void IPromise.Join() { - WaitResult(-1); - if (IsRejected) - Rethrow(); - } - - void IPromise.Join(int timeout) { - WaitResult(timeout); - if (IsRejected) - Rethrow(); - } - - public T Join() { - WaitResult(-1); - if (IsRejected) - Rethrow(); - return m_result; - } - - public T Join(int timeout) { - WaitResult(timeout); - if (IsRejected) - Rethrow(); - return m_result; - } - } -} -
--- a/Implab/CancellationToken.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/CancellationToken.cs Tue Jan 30 01:37:17 2018 +0300 @@ -3,63 +3,26 @@ using Implab.Parallels; namespace Implab { - public class CancellationToken : ICancellationToken { - const int CANCEL_NOT_REQUESTED = 0; - const int CANCEL_REQUESTING = 1; - const int CANCEL_REQUESTED = 2; - - volatile int m_state = CANCEL_NOT_REQUESTED; - - Action<Exception> m_handler; - - Parallels.SimpleAsyncQueue<Action<Exception>> m_handlers; - - public bool IsCancellationRequested { - get { return m_state == CANCEL_REQUESTED; } + /// <summary> + /// The cancellation token signals to the worker that cancellation has been + /// requested, after the signal is received the worker decides wheather to + /// cancel its work or to continue. + /// </summary> + public class CancellationToken : AbstractEvent<Action<Exception>> { + public CancellationToken() { + } - - public Exception CancellationReason { - get; set; - } + + public void RequestCancellation() { - public void CancellationRequested(Action<Exception> handler) { - Safe.ArgumentNotNull(handler, nameof(handler)); - if (IsCancellationRequested) { - handler(CancellationReason); - } else { - EnqueueHandler(handler); - if (IsCancellationRequested && TryDequeueHandler(out handler)) - handler(CancellationReason); - } } - bool TryDequeueHandler(out Action<Exception> handler) { - handler = Interlocked.Exchange(ref m_handler, null); - if (handler != null) - return true; - else if (m_handlers != null) - return m_handlers.TryDequeue(out handler); - else - return false; - } + public void RequestCancellation(Exception reason) { - void EnqueueHandler(Action<Exception> handler) { - if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { - if (m_handlers == null) - // compare-exchange will fprotect from loosing already created queue - Interlocked.CompareExchange(ref m_handlers, new SimpleAsyncQueue<Action<Exception>>(), null); - m_handlers.Enqueue(handler); - } - } - - void RequestCancellation(Exception reason) { - if (Interlocked.CompareExchange(ref m_state, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED) == CANCEL_NOT_REQUESTED) { - if (reason == null) - reason = new OperationCanceledException(); - CancellationReason = reason; - m_state = CANCEL_REQUESTED; - } } + protected override void SignalHandler(Action<Exception> handler) { + throw new NotImplementedException(); + } } } \ No newline at end of file
--- a/Implab/Components/PollingComponent.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,155 +0,0 @@ -using System; -using System.Threading; -using Implab.Diagnostics; - -namespace Implab.Components { - public class PollingComponent : RunnableComponent { - readonly Timer m_timer; - readonly Func<Func<ICancellationToken, IPromise>, IPromise> m_dispatcher; - readonly TimeSpan m_interval; - - readonly object m_lock = new object(); - - ActionTask m_pending; - - protected PollingComponent(TimeSpan interval, Func<Func<ICancellationToken, IPromise>, IPromise> dispatcher, bool initialized) : base(initialized) { - m_timer = new Timer(OnInternalTick); - - m_interval = interval; - m_dispatcher = dispatcher; - } - - protected override IPromise OnStart() { - m_timer.Change(TimeSpan.Zero, m_interval); - - return base.OnStart(); - } - - void OnInternalTick(object state) { - if (StartTick()) { - try { - if (m_dispatcher != null) { - var result = m_dispatcher(OnTick); - m_pending.CancellationRequested(result.Cancel); - AwaitTick(result); - } else { - AwaitTick(OnTick(m_pending)); - } - } catch (Exception error) { - HandleTickError(error); - } - } - } - - /// <summary> - /// Checks wheather there is no running handler in the component and marks that the handler is starting. - /// </summary> - /// <returns>boolean value, true - the new tick handler may be invoked, false - a tick handler is already running or a component isn't running.</returns> - /// <remarks> - /// If the component is stopping no new handlers can be run. Every successful call to this method must be completed with either AwaitTick or HandleTickError handlers. - /// </remarks> - protected virtual bool StartTick() { - lock (m_lock) { - if (State != ExecutionState.Running || m_pending != null) - return false; - // actually the component may be in a not running state here (stopping, disposed or what ever), - // but OnStop method also locks on the same object and will handle operation in m_pending - m_pending = new ActionTask( - () => { - // only one operation is running, it's safe to assing m_pending from it - m_pending = null; - }, - ex => { - try { - OnTickError(ex); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } finally { - m_pending = null; - } - // suppress error - }, - ex => { - try { - OnTickCancel(ex); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } finally { - m_pending = null; - } - // supress cancellation - }, - false - ); - return true; - } - } - - /// <summary> - /// Awaits the tick. - /// </summary> - /// <param name="tick">Tick.</param> - /// <remarks> - /// This method is called only after StartTick method and m_pending will hold the promise which should be fulfilled. - /// </remarks> - void AwaitTick(IPromise tick) { - if (tick == null) { - m_pending.Resolve(); - } else { - tick.On( - m_pending.Resolve, - m_pending.Reject, - m_pending.CancelOperation - ); - } - } - - /// <summary> - /// Handles the tick error. - /// </summary> - /// <remarks> - /// This method is called only after StartTick method and m_pending will hold the promise which should be fulfilled. - /// </remarks> - void HandleTickError(Exception error) { - m_pending.Reject(error); - } - - protected virtual void OnTickError(Exception error) { - } - - protected virtual void OnTickCancel(Exception error) { - } - - /// <summary> - /// Invoked when the timer ticks, use this method to implement your logic - /// </summary> - protected virtual IPromise OnTick(ICancellationToken cancellationToken) { - return Promise.Success; - } - - protected override IPromise OnStop() { - m_timer.Change(-1, -1); - - // the component is in the stopping state - lock (m_lock) { - // after this lock no more pending operations could be created - var pending = m_pending; - // m_pending could be fulfilled and set to null already - if (pending != null) { - pending.Cancel(); - return pending.Then(base.OnStop); - } - } - - return base.OnStop(); - } - - protected override void Dispose(bool disposing) { - if (disposing) - m_timer.Dispose(); - - base.Dispose(disposing); - } - } -} -
--- a/Implab/Components/RunnableComponent.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,411 +0,0 @@ -using System; -using System.Diagnostics.CodeAnalysis; - -namespace Implab.Components { - public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable { - enum Commands { - Ok = 0, - Fail, - Init, - Start, - Stop, - Dispose, - Reset, - Last = Reset - } - - class StateMachine { - public static readonly ExecutionState[,] ReusableTransitions; - public static readonly ExecutionState[,] NonreusableTransitions; - - class StateBuilder { - readonly ExecutionState[,] m_states; - - public ExecutionState[,] States { - get { return m_states; } - } - public StateBuilder(ExecutionState[,] states) { - m_states = states; - } - - public StateBuilder() { - m_states = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1]; - } - - public StateBuilder Edge(ExecutionState s1, ExecutionState s2, Commands cmd) { - m_states[(int)s1, (int)cmd] = s2; - return this; - } - - public StateBuilder Clone() { - return new StateBuilder((ExecutionState[,])m_states.Clone()); - } - } - - static StateMachine() { - ReusableTransitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1]; - - var common = new StateBuilder() - .Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init) - .Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok) - .Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail) - - .Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start) - .Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Starting, ExecutionState.Running, Commands.Ok) - .Edge(ExecutionState.Starting, ExecutionState.Failed, Commands.Fail) - .Edge(ExecutionState.Starting, ExecutionState.Stopping, Commands.Stop) - .Edge(ExecutionState.Starting, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Running, ExecutionState.Failed, Commands.Fail) - .Edge(ExecutionState.Running, ExecutionState.Stopping, Commands.Stop) - .Edge(ExecutionState.Running, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose) - .Edge(ExecutionState.Failed, ExecutionState.Initializing, Commands.Reset) - - .Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail) - .Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Dispose) - - .Edge(ExecutionState.Disposed, ExecutionState.Disposed, Commands.Dispose); - - var reusable = common - .Clone() - .Edge(ExecutionState.Stopping, ExecutionState.Ready, Commands.Ok); - - var nonreusable = common - .Clone() - .Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok); - - NonreusableTransitions = nonreusable.States; - ReusableTransitions = reusable.States; - - } - - readonly ExecutionState[,] m_states; - - public ExecutionState State { - get; - private set; - } - - public StateMachine(ExecutionState[,] states, ExecutionState initial) { - State = initial; - m_states = states; - } - - public bool Move(Commands cmd) { - var next = m_states[(int)State, (int)cmd]; - if (next == ExecutionState.Undefined) - return false; - State = next; - return true; - } - } - - IPromise m_pending; - Exception m_lastError; - - readonly StateMachine m_stateMachine; - readonly bool m_reusable; - public event EventHandler<StateChangeEventArgs> StateChanged; - - /// <summary> - /// Initializes component state. - /// </summary> - /// <param name="initialized">If set, the component initial state is <see cref="ExecutionState.Ready"/> and the component is ready to start, otherwise initialization is required.</param> - /// <param name="reusable">If set, the component may start after it has been stopped, otherwise the component is disposed after being stopped.</param> - protected RunnableComponent(bool initialized, bool reusable) { - m_stateMachine = new StateMachine( - reusable ? StateMachine.ReusableTransitions : StateMachine.NonreusableTransitions, - initialized ? ExecutionState.Ready : ExecutionState.Created - ); - m_reusable = reusable; - } - - /// <summary> - /// Initializes component state. The component created with this constructor is not reusable, i.e. it will be disposed after stop. - /// </summary> - /// <param name="initialized">If set, the component initial state is <see cref="ExecutionState.Ready"/> and the component is ready to start, otherwise initialization is required.</param> - protected RunnableComponent(bool initialized) : this(initialized, false) { - } - - void ThrowInvalidCommand(Commands cmd) { - if (m_stateMachine.State == ExecutionState.Disposed) - throw new ObjectDisposedException(ToString()); - - throw new InvalidOperationException(String.Format("Command {0} is not allowed in the state {1}", cmd, m_stateMachine.State)); - } - - bool MoveIfInState(Commands cmd, IPromise pending, Exception error, ExecutionState state) { - ExecutionState prev, current; - lock (m_stateMachine) { - if (m_stateMachine.State != state) - return false; - - prev = m_stateMachine.State; - if (!m_stateMachine.Move(cmd)) - ThrowInvalidCommand(cmd); - current = m_stateMachine.State; - - m_pending = pending; - m_lastError = error; - } - if (prev != current) - OnStateChanged(prev, current, error); - return true; - } - - bool MoveIfPending(Commands cmd, IPromise pending, Exception error, IPromise expected) { - ExecutionState prev, current; - lock (m_stateMachine) { - if (m_pending != expected) - return false; - prev = m_stateMachine.State; - if (!m_stateMachine.Move(cmd)) - ThrowInvalidCommand(cmd); - current = m_stateMachine.State; - m_pending = pending; - m_lastError = error; - } - if (prev != current) - OnStateChanged(prev, current, error); - return true; - } - - IPromise Move(Commands cmd, IPromise pending, Exception error) { - ExecutionState prev, current; - IPromise ret; - lock (m_stateMachine) { - prev = m_stateMachine.State; - if (!m_stateMachine.Move(cmd)) - ThrowInvalidCommand(cmd); - current = m_stateMachine.State; - - ret = m_pending; - m_pending = pending; - m_lastError = error; - - } - if (prev != current) - OnStateChanged(prev, current, error); - return ret; - } - - /// <summary> - /// Handles the state of the component change event, raises the <see cref="StateChanged"/> event, handles - /// the transition to the <see cref="ExecutionState.Disposed"/> state (calls <see cref="Dispose(bool)"/> method). - /// </summary> - /// <param name="previous">The previous state</param> - /// <param name="current">The current state</param> - /// <param name="error">The last error if any.</param> - /// <remarks> - /// <para> - /// If the previous state and the current state are same this method isn't called, such situiation is treated - /// as the component hasn't changed it's state. - /// </para> - /// <para> - /// When overriding this method ensure the call is made to the base implementation, otherwise it will lead to - /// the wrong behavior of the component. - /// </para> - /// </remarks> - protected virtual void OnStateChanged(ExecutionState previous, ExecutionState current, Exception error) { - StateChanged.DispatchEvent( - this, - new StateChangeEventArgs { - State = current, - LastError = error - } - ); - if (current == ExecutionState.Disposed) { - GC.SuppressFinalize(this); - Dispose(true); - } - } - - /// <summary> - /// Moves the component from running to failed state. - /// </summary> - /// <param name="error">The exception which is describing the error.</param> - protected bool Fail(Exception error) { - return MoveIfInState(Commands.Fail, null, error, ExecutionState.Running); - } - - /// <summary> - /// Tries to reset <see cref="ExecutionState.Failed"/> state to <see cref="ExecutionState.Ready"/>. - /// </summary> - /// <returns>True if component is reset to <see cref="ExecutionState.Ready"/>, false if the componet wasn't - /// in <see cref="ExecutionState.Failed"/> state.</returns> - /// <remarks> - /// This method checks the current state of the component and if it's in <see cref="ExecutionState.Failed"/> - /// moves component to <see cref="ExecutionState.Initializing"/>. - /// The <see cref="OnResetState()"/> is called and if this method completes succesfully the component moved - /// to <see cref="ExecutionState.Ready"/> state, otherwise the component is moved to <see cref="ExecutionState.Failed"/> - /// state. If <see cref="OnResetState()"/> throws an exception it will be propagated by this method to the caller. - /// </remarks> - protected bool ResetState() { - if (!MoveIfInState(Commands.Reset, null, null, ExecutionState.Failed)) - return false; - - try { - OnResetState(); - Move(Commands.Ok, null, null); - return true; - } catch (Exception err) { - Move(Commands.Fail, null, err); - throw; - } - } - - /// <summary> - /// This method is called by <see cref="ResetState"/> to reinitialize component in the failed state. - /// </summary> - /// <remarks> - /// Default implementation throws <see cref="NotImplementedException"/> which will cause the component - /// fail to reset it's state and it left in <see cref="ExecutionState.Failed"/> state. - /// If this method doesn't throw exceptions the component is moved to <see cref="ExecutionState.Ready"/> state. - /// </remarks> - protected virtual void OnResetState() { - throw new NotImplementedException(); - } - - IPromise InvokeAsync(Commands cmd, Func<IPromise> action, Action<IPromise, IResolvable> chain) { - IPromise promise = null; - IPromise prev; - - var task = new ActionChainTask(action, null, null, true); - - Action<Exception> errorOrCancel = e => { - if (e == null) - e = new OperationCanceledException(); - MoveIfPending(Commands.Fail, null, e, promise); - throw new PromiseTransientException(e); - }; - - promise = task.Then( - () => MoveIfPending(Commands.Ok, null, null, promise), - errorOrCancel, - errorOrCancel - ); - - prev = Move(cmd, promise, null); - - if (prev == null) - task.Resolve(); - else - chain(prev, task); - - return promise; - } - - - #region IInitializable implementation - - public void Initialize() { - Move(Commands.Init, null, null); - - try { - OnInitialize(); - Move(Commands.Ok, null, null); - } catch (Exception err) { - Move(Commands.Fail, null, err); - throw; - } - } - - protected virtual void OnInitialize() { - } - - #endregion - - #region IRunnable implementation - - public IPromise Start() { - return InvokeAsync(Commands.Start, OnStart, null); - } - - protected virtual IPromise OnStart() { - return Promise.Success; - } - - public IPromise Stop() { - return InvokeAsync(Commands.Stop, OnStop, StopPending); - } - - protected virtual IPromise OnStop() { - return Promise.Success; - } - - /// <summary> - /// Stops the current operation if one exists. - /// </summary> - /// <param name="current">Current.</param> - /// <param name="stop">Stop.</param> - protected virtual void StopPending(IPromise current, IResolvable stop) { - if (current == null) { - stop.Resolve(); - } else { - // связваем текущую операцию с операцией остановки - current.On( - stop.Resolve, // если текущая операция заверщилась, то можно начинать остановку - stop.Reject, // если текущая операция дала ошибку - то все плохо, нельзя продолжать - e => stop.Resolve() // если текущая отменилась, то можно начинать остановку - ); - // посылаем текущей операции сигнал остановки - current.Cancel(); - } - } - - public ExecutionState State { - get { - return m_stateMachine.State; - } - } - - public Exception LastError { - get { - return m_lastError; - } - } - - #endregion - - #region IDisposable implementation - - /// <summary> - /// Releases all resource used by the <see cref="Implab.Components.RunnableComponent"/> object. - /// </summary> - /// <remarks> - /// <para>Will not try to stop the component, it will just release all resources. - /// To cleanup the component gracefully use <see cref="Stop()"/> method.</para> - /// <para> - /// In normal cases the <see cref="Dispose()"/> method shouldn't be called, the call to the <see cref="Stop()"/> - /// method is sufficient to cleanup the component. Call <see cref="Dispose()"/> only to cleanup after errors, - /// especially if <see cref="Stop"/> method is failed. Using this method insted of <see cref="Stop()"/> may - /// lead to the data loss by the component. - /// </para></remarks> - [SuppressMessage("Microsoft.Design", "CA1063:ImplementIDisposableCorrectly", Justification = "Dipose(bool) and GC.SuppessFinalize are called")] - public void Dispose() { - Move(Commands.Dispose, null, null); - } - - ~RunnableComponent() { - Dispose(false); - } - - #endregion - - /// <summary> - /// Releases all resources used by the component, called automatically, override this method to implement your cleanup. - /// </summary> - /// <param name="disposing">true if this method is called during normal dispose process.</param> - /// <param name="pending">The operation which is currenty pending</param> - protected virtual void Dispose(bool disposing) { - } - - } -} -
--- a/Implab/Deferred.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/Deferred.cs Tue Jan 30 01:37:17 2018 +0300 @@ -7,10 +7,13 @@ /// </summary> public class Deferred : IResolvable { - readonly AbstractPromise m_promise; + readonly Promise m_promise; readonly IDispatcher m_dispatcher; - internal Deferred(AbstractPromise promise, IDispatcher dispatcher) { + internal Deferred(IDispatcher dispatcher) : this(new Promise(), dispatcher) { + } + + internal Deferred(Promise promise, IDispatcher dispatcher) { Debug.Assert(promise != null); m_promise = promise; m_dispatcher = dispatcher; @@ -21,11 +24,14 @@ } public void Reject(Exception error) { - m_promise.Reject(error); + if (error is PromiseTransientException) + error = ((PromiseTransientException)error).InnerException; + + m_promise.RejectPromise(error); } public void Resolve() { - m_promise.Resolve(); + m_promise.ResolvePromise(); } public void Resolve(IPromise thenable) { @@ -36,7 +42,7 @@ else if (m_dispatcher != null) // dispatch (see ecma-262/6.0: 25.4.1.3.2 Promise Resolve Functions) - m_dispatcher.Enqueue(() => Chain(thenable)); + m_dispatcher.Enqueue(Chain, thenable); else Chain(thenable); }
--- a/Implab/Deferred`1.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/Deferred`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -3,10 +3,13 @@ namespace Implab { public class Deferred<T> : IResolvable<T> { - readonly AbstractPromise<T> m_promise; + readonly Promise<T> m_promise; readonly IDispatcher m_dispatcher; - internal Deferred(AbstractPromise<T> promise, IDispatcher dispatcher) { + internal Deferred(IDispatcher dispatcher) : this(new Promise<T>(), dispatcher) { + } + + internal Deferred(Promise<T> promise, IDispatcher dispatcher) { Debug.Assert(promise != null); m_promise = promise; m_dispatcher = dispatcher; @@ -17,11 +20,14 @@ } public void Reject(Exception error) { - m_promise.Reject(error); + if (error is PromiseTransientException) + error = ((PromiseTransientException)error).InnerException; + + m_promise.RejectPromise(error); } public void Resolve(T value) { - m_promise.Resolve(value); + m_promise.ResolvePromise(value); } public void Resolve(IPromise<T> thenable) { @@ -32,7 +38,7 @@ else if (m_dispatcher != null) // dispatch (see ecma-262/6.0: 25.4.1.3.2 Promise Resolve Functions) - m_dispatcher.Enqueue(() => Chain(thenable)); + m_dispatcher.Enqueue(Chain, thenable); else Chain(thenable); }
--- a/Implab/Diagnostics/Extensions.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,41 +0,0 @@ -namespace Implab.Diagnostics { - public static class Extensions { - public static IPromise<T> EndLogicalOperation<T>(this IPromise<T> promise) { - Safe.ArgumentNotNull(promise, "promise"); - var op = TraceContext.Instance.DetachLogicalOperation(); - - return promise.On( - x => { - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.TraceInformation("promise = {0}", x); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - }, - err =>{ - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.TraceError("promise died {0}", err); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - }, - reason => { - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.TraceInformation("promise cancelled {0}", reason == null ? "<no-reason>" : reason.Message); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - } - ); - } - - public static IPromise EndLogicalOperation(this IPromise promise) { - Safe.ArgumentNotNull(promise, "promise"); - var op = TraceContext.Instance.DetachLogicalOperation(); - - return promise.On(() => { - TraceContext.Instance.EnterLogicalOperation(op,true); - TraceLog.EndLogicalOperation(); - TraceContext.Instance.Leave(); - }, PromiseEventType.All); - } - } -} -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ExceptionHelpers.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,18 @@ +using System; +using System.Reflection; +using System.Runtime.ExceptionServices; + +namespace Implab { + public static class ExceptionHelpers { + public static void Rethrow(this Exception that) { + ExceptionDispatchInfo.Capture(that).Throw(); + } + + public static void ThrowInvocationException(this Exception that) { + if (that is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", that); + else + throw new TargetInvocationException(that); + } + } +} \ No newline at end of file
--- a/Implab/IDispatcher.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/IDispatcher.cs Tue Jan 30 01:37:17 2018 +0300 @@ -3,5 +3,7 @@ namespace Implab { public interface IDispatcher { void Enqueue(Action job); + + void Enqueue<T>(Action<T> job, T arg); } } \ No newline at end of file
--- a/Implab/Parallels/ArrayTraits.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,207 +0,0 @@ -using Implab.Diagnostics; -using System; -using System.Diagnostics; -using System.Threading; - -namespace Implab.Parallels { - public static class ArrayTraits { - class ArrayIterator<TSrc> : DispatchPool<int> { - readonly Action<TSrc> m_action; - readonly TSrc[] m_source; - readonly Promise<int> m_promise = new Promise<int>(); - readonly LogicalOperation m_logicalOperation; - - int m_pending; - int m_next; - - public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) - : base(threads) { - - Debug.Assert(source != null); - Debug.Assert(action != null); - - m_logicalOperation = TraceContext.Instance.CurrentOperation; - m_next = 0; - m_source = source; - m_pending = source.Length; - m_action = action; - - m_promise.On(Dispose, PromiseEventType.All); - - InitPool(); - } - - public Promise<int> Promise { - get { - return m_promise; - } - } - - protected override void Worker() { - TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); - try { - base.Worker(); - } finally { - TraceContext.Instance.Leave(); - } - } - - protected override bool TryDequeue(out int unit) { - unit = Interlocked.Increment(ref m_next) - 1; - return unit < m_source.Length; - } - - protected override void InvokeUnit(int unit) { - try { - m_action(m_source[unit]); - var pending = Interlocked.Decrement(ref m_pending); - if (pending == 0) - m_promise.Resolve(m_source.Length); - } catch (Exception e) { - m_promise.Reject(e); - } - } - } - - class ArrayMapper<TSrc, TDst>: DispatchPool<int> { - readonly Func<TSrc, TDst> m_transform; - readonly TSrc[] m_source; - readonly TDst[] m_dest; - readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); - readonly LogicalOperation m_logicalOperation; - - int m_pending; - int m_next; - - public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) - : base(threads) { - - Debug.Assert (source != null); - Debug.Assert( transform != null); - - m_next = 0; - m_source = source; - m_dest = new TDst[source.Length]; - m_pending = source.Length; - m_transform = transform; - m_logicalOperation = TraceContext.Instance.CurrentOperation; - - m_promise.On(Dispose, PromiseEventType.All); - - InitPool(); - } - - public Promise<TDst[]> Promise { - get { - return m_promise; - } - } - - protected override void Worker() { - TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); - try { - base.Worker(); - } finally { - TraceContext.Instance.Leave(); - } - } - - protected override bool TryDequeue(out int unit) { - unit = Interlocked.Increment(ref m_next) - 1; - return unit < m_source.Length; - } - - protected override void InvokeUnit(int unit) { - try { - m_dest[unit] = m_transform(m_source[unit]); - var pending = Interlocked.Decrement(ref m_pending); - if (pending == 0) - m_promise.Resolve(m_dest); - } catch (Exception e) { - m_promise.Reject(e); - } - } - } - - public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { - if (source == null) - throw new ArgumentNullException("source"); - if (transform == null) - throw new ArgumentNullException("transform"); - - var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); - return mapper.Promise; - } - - public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { - if (source == null) - throw new ArgumentNullException("source"); - if (action == null) - throw new ArgumentNullException("action"); - - var iter = new ArrayIterator<TSrc>(source, action, threads); - return iter.Promise; - } - - public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) { - if (source == null) - throw new ArgumentNullException("source"); - if (transform == null) - throw new ArgumentNullException("transform"); - if (threads <= 0) - throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero"); - - if (source.Length == 0) - return Promise<TDst[]>.FromResult(new TDst[0]); - - var promise = new Promise<TDst[]>(); - var res = new TDst[source.Length]; - var pending = source.Length; - - object locker = new object(); - int slots = threads; - - // Analysis disable AccessToDisposedClosure - AsyncPool.RunThread<int>(() => { - for (int i = 0; i < source.Length; i++) { - if(promise.IsFulfilled) - break; // stop processing in case of error or cancellation - var idx = i; - - if (Interlocked.Decrement(ref slots) < 0) { - lock(locker) { - while(slots < 0) - Monitor.Wait(locker); - } - } - - try { - transform(source[i]) - .On( x => { - Interlocked.Increment(ref slots); - lock (locker) { - Monitor.Pulse(locker); - } - }) - .On( - x => { - res[idx] = x; - var left = Interlocked.Decrement(ref pending); - if (left == 0) - promise.Resolve(res); - }, - promise.Reject - ); - - } catch (Exception e) { - promise.Reject(e); - } - } - return 0; - }); - - return promise; - } - - } -}
--- a/Implab/Parallels/AsyncPool.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,155 +0,0 @@ -using Implab.Diagnostics; -using System; -using System.Threading; -using System.Linq; - -namespace Implab.Parallels { - /// <summary> - /// Класс для распаралеливания задач. - /// </summary> - /// <remarks> - /// Используя данный класс и лямда выражения можно распараллелить - /// вычисления, для этого используется концепция обещаний. - /// </remarks> - public static class AsyncPool { - - public static IPromise<T> Invoke<T>(Func<T> func) { - var p = new Promise<T>(); - var caller = TraceContext.Instance.CurrentOperation; - - ThreadPool.QueueUserWorkItem(param => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func()); - } catch(Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return p; - } - - public static IPromise<T> Invoke<T>(Func<ICancellationToken, T> func) { - var p = new Promise<T>(); - var caller = TraceContext.Instance.CurrentOperation; - - ThreadPool.QueueUserWorkItem(param => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func(p)); - } catch(Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return p; - } - - public static IPromise<T> RunThread<T>(Func<T> func) { - var p = new Promise<T>(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func()); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - public static IPromise<T> RunThread<T>(Func<ICancellationToken, T> func) { - var p = new Promise<T>(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - p.Resolve(func(p)); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - - public static IPromise RunThread(Action func) { - var p = new Promise(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - func(); - p.Resolve(); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - public static IPromise RunThread(Action<ICancellationToken> func) { - var p = new Promise(); - - var caller = TraceContext.Instance.CurrentOperation; - - var worker = new Thread(() => { - TraceContext.Instance.EnterLogicalOperation(caller,false); - try { - func(p); - p.Resolve(); - } catch (Exception e) { - p.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - worker.IsBackground = true; - worker.Start(); - - return p; - } - - public static IPromise[] RunThread(params Action[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - - public static IPromise[] RunThread(params Action<ICancellationToken>[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - - public static IPromise<T>[] RunThread<T>(params Func<T>[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - - public static IPromise<T>[] RunThread<T>(params Func<ICancellationToken, T>[] func) { - return func.Select(f => RunThread(f)).ToArray(); - } - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/SyncContextDispatcher.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,20 @@ +using System; +using System.Threading; + +namespace Implab { + public class SyncContextDispatcher : IDispatcher { + SynchronizationContext m_context; + public SyncContextDispatcher(SynchronizationContext context) { + Safe.ArgumentNotNull(context, nameof(context)); + m_context = context; + } + + public void Enqueue(Action job) { + m_context.Post((o) => job(), null); + } + + public void Enqueue<T>(Action<T> job, T arg) { + m_context.Post((o) => job((T)o), arg); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/ThreadPoolDispatcher.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,20 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + public class ThreadPoolDispatcher : IDispatcher { + + public static ThreadPoolDispatcher Instance { get; private set; } = new ThreadPoolDispatcher(); + + private ThreadPoolDispatcher() { + } + + public void Enqueue(Action job) { + ThreadPool.QueueUserWorkItem((o) => job(), null); + } + + public void Enqueue<T>(Action<T> job, T arg) { + ThreadPool.QueueUserWorkItem((o) => job((T)o), arg); + } + } +} \ No newline at end of file
--- a/Implab/Parallels/WorkerPool.cs Fri Jan 26 18:46:27 2018 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,152 +0,0 @@ -using System; -using System.Threading; -using System.Diagnostics; -using Implab.Diagnostics; - -namespace Implab.Parallels { - public class WorkerPool : DispatchPool<Action> { - - AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); - int m_queueLength; - readonly int m_threshold = 1; - - public WorkerPool(int minThreads, int maxThreads, int threshold) - : base(minThreads, maxThreads) { - m_threshold = threshold; - InitPool(); - } - - public WorkerPool(int minThreads, int maxThreads) : - base(minThreads, maxThreads) { - InitPool(); - } - - public WorkerPool(int threads) - : base(threads) { - InitPool(); - } - - public WorkerPool() { - InitPool(); - } - - public IPromise<T> Invoke<T>(Func<T> task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new FuncTask<T>(task, null, null, true); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - - promise.Resolve(); - - TraceContext.Instance.Leave(); - }); - - return promise; - } - - public IPromise Invoke(Action task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new ActionTask(task, null, null, true); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - - promise.Resolve(); - - TraceContext.Instance.Leave(); - }); - - return promise; - } - - public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new Promise<T>(); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - try { - if (!promise.CancelOperationIfRequested()) - promise.Resolve(task(promise)); - } catch (Exception e) { - promise.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return promise; - } - - public IPromise Invoke<T>(Action<ICancellationToken> task) { - if (task == null) - throw new ArgumentNullException("task"); - if (IsDisposed) - throw new ObjectDisposedException(ToString()); - - var promise = new Promise(); - - var lop = TraceContext.Instance.CurrentOperation; - - EnqueueTask(delegate { - TraceContext.Instance.EnterLogicalOperation(lop, false); - try { - if (!promise.CancelOperationIfRequested()) { - task(promise); - promise.Resolve(); - } - } catch (Exception e) { - promise.Reject(e); - } finally { - TraceContext.Instance.Leave(); - } - }); - - return promise; - } - - protected void EnqueueTask(Action unit) { - Debug.Assert(unit != null); - var len = Interlocked.Increment(ref m_queueLength); - m_queue.Enqueue(unit); - - if (len > m_threshold * PoolSize) { - StartWorker(); - } - - SignalThread(); - } - - protected override bool TryDequeue(out Action unit) { - if (m_queue.TryDequeue(out unit)) { - Interlocked.Decrement(ref m_queueLength); - return true; - } - return false; - } - - protected override void InvokeUnit(Action unit) { - unit(); - } - - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Promise.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,209 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Reflection; +using System.Threading.Tasks; +using Implab.Parallels; + +namespace Implab { + public class Promise : AbstractEvent<IResolvable>, IPromise { + public static IDispatcher DefaultDispatcher { + get { + return ThreadPoolDispatcher.Instance; + } + } + + class ResolvableSignal : IResolvable { + public Signal Signal { get; private set; } + public ResolvableSignal() { + Signal = new Signal(); + } + + + public void Reject(Exception error) { + Signal.Set(); + } + + public void Resolve() { + Signal.Set(); + } + } + + PromiseState m_state; + + Exception m_error; + + public bool IsRejected { + get { + return m_state == PromiseState.Rejected; + } + } + + public bool IsFulfilled { + get { + return m_state == PromiseState.Fulfilled; + } + } + + public Exception RejectReason { + get { + return m_error; + } + } + + internal Promise() { + + } + + internal void ResolvePromise() { + if (BeginTransit()) { + m_state = PromiseState.Fulfilled; + CompleteTransit(); + } + } + + internal void RejectPromise(Exception reason) { + if (BeginTransit()) { + m_error = reason; + m_state = PromiseState.Rejected; + CompleteTransit(); + } + } + + + #region implemented abstract members of AbstractPromise + + protected override void SignalHandler(IResolvable handler) { + switch (m_state) { + case PromiseState.Fulfilled: + handler.Resolve(); + break; + case PromiseState.Rejected: + handler.Reject(RejectReason); + break; + default: + throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); + } + } + + protected void WaitResult(int timeout) { + if (!(IsResolved || GetFulfillSignal().Wait(timeout))) + throw new TimeoutException(); + } + + protected Signal GetFulfillSignal() { + var next = new ResolvableSignal(); + Then(next); + return next.Signal; + } + + #endregion + + + public Type ResultType { + get { + return typeof(void); + } + } + + + protected void Rethrow() { + Debug.Assert(m_error != null); + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); + } + + public void Then(IResolvable next) { + AddHandler(next); + } + + public IPromise<T> Cast<T>() { + throw new InvalidCastException(); + } + + public void Join() { + WaitResult(-1); + if (IsRejected) + Rethrow(); + } + + public void Join(int timeout) { + WaitResult(timeout); + if (IsRejected) + Rethrow(); + } + + public static ResolvedPromise Resolve() { + return new ResolvedPromise(); + } + + public static ResolvedPromise<T> Resolve<T>(T result) { + return new ResolvedPromise<T>(result); + } + + public static RejectedPromise Reject(Exception reason) { + return new RejectedPromise(reason); + } + + public static RejectedPromise<T> Reject<T>(Exception reason) { + return new RejectedPromise<T>(reason); + } + + public static IPromise Create(PromiseExecutor executor) { + Safe.ArgumentNotNull(executor, nameof(executor)); + + var p = new Promise(); + var d = new Deferred(p, DefaultDispatcher); + + try { + executor(d); + } catch (Exception e) { + d.Reject(e); + } + + return d.Promise; + } + + public static IPromise<T> Create<T>(PromiseExecutor<T> executor) { + Safe.ArgumentNotNull(executor, nameof(executor)); + + var p = new Promise<T>(); + var d = new Deferred<T>(p, DefaultDispatcher); + + try { + executor(d); + } catch (Exception e) { + d.Reject(e); + } + + return d.Promise; + } + + public static IPromise All(IEnumerable<IPromise> promises) { + var d = new Deferred(DefaultDispatcher); + var all = new PromiseAll(d); + foreach (var promise in promises) { + all.AddPromise(promise); + if (all.Done) + break; + } + all.Complete(); + return all.ResultPromise; + } + + public static IPromise<T[]> All<T>(IEnumerable<IPromise<T>> promises, Func<T, IPromise> cleanup, Action cancel) { + var d = new Deferred<T[]>(DefaultDispatcher); + var all = new PromiseAll<T>(d, cleanup, cancel); + foreach (var promise in promises) { + all.AddPromise(promise); + if (all.Done) + break; + } + all.Complete(); + return all.ResultPromise; + } + } +} +
--- a/Implab/PromiseActionReaction.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/PromiseActionReaction.cs Tue Jan 30 01:37:17 2018 +0300 @@ -3,63 +3,48 @@ namespace Implab { class PromiseActionReaction : PromiseReaction { - readonly Action m_fulfilled; - - readonly Action<Exception> m_rejected; readonly Deferred m_next; - public PromiseActionReaction(Action fulfilled, Action<Exception> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public IPromise Promise { + get { return m_next.Promise; } + } + + public PromiseActionReaction(Action fulfilled, Action<Exception> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { - fulfilled(); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; - } - - public PromiseActionReaction(Func<IPromise> fulfilled, Func<Exception, IPromise> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { - if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; - if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Action fulfilled, Func<Exception, IPromise> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Func<IPromise> fulfilled, Func<Exception, IPromise> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { - fulfilled(); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Func<IPromise> fulfilled, Action<Exception> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Action fulfilled, Func<Exception, IPromise> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } + public PromiseActionReaction(Func<IPromise> fulfilled, Action<Exception> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); - protected override bool HasFulfilHandler => m_fulfilled != null; - - protected override bool HasRejectHandler => m_rejected != null; + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } protected override void DefaultReject(Exception reason) { m_next.Reject(reason); @@ -68,21 +53,5 @@ protected override void DefaultResolve() { m_next.Resolve(); } - - protected override void RejectImpl(Exception reason) { - try { - m_rejected(reason); - } catch (Exception e){ - m_next.Reject(e); - } - } - - protected override void ResolveImpl() { - try { - m_fulfilled(); - } catch (Exception e){ - m_next.Reject(e); - } - } } } \ No newline at end of file
--- a/Implab/PromiseActionReaction`1.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/PromiseActionReaction`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -3,62 +3,47 @@ namespace Implab { class PromiseActionReaction<T> : PromiseReaction<T> { - readonly Action<T> m_fulfilled; - - readonly Action<Exception> m_rejected; - readonly Deferred m_next; - public PromiseActionReaction(Action<T> fulfilled, Action<Exception> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public IPromise Promise { + get { return m_next.Promise; } + } + + public PromiseActionReaction(Action<T> fulfilled, Action<Exception> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = (x) => { - fulfilled(x); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Func<T, IPromise> fulfilled, Func<Exception, IPromise> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Func<T, IPromise> fulfilled, Func<Exception, IPromise> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = (x) => { next.Resolve(fulfilled(x)); }; - if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; - } - - public PromiseActionReaction(Action<T> fulfilled, Func<Exception, IPromise> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { - if (fulfilled != null) - m_fulfilled = (x) => { - fulfilled(x); - next.Resolve(); - }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseActionReaction(Func<T, IPromise> fulfilled, Action<Exception> rejected, Deferred next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseActionReaction(Action<T> fulfilled, Func<Exception, IPromise> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); if (fulfilled != null) - m_fulfilled = (x) => { next.Resolve(fulfilled(x)); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (x) => { - rejected(x); - next.Resolve(); - }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - protected override bool HasFulfilHandler => m_fulfilled != null; + public PromiseActionReaction(Func<T, IPromise> fulfilled, Action<Exception> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); - protected override bool HasRejectHandler => m_rejected != null; + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } protected override void DefaultReject(Exception reason) { m_next.Reject(reason); @@ -67,21 +52,5 @@ protected override void DefaultResolve(T result) { m_next.Resolve(); } - - protected override void RejectImpl(Exception reason) { - try { - m_rejected(reason); - } catch (Exception e) { - m_next.Reject(e); - } - } - - protected override void ResolveImpl(T result) { - try { - m_fulfilled(result); - } catch (Exception e) { - m_next.Reject(e); - } - } } } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseAll.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,40 @@ +using System; +using System.Threading; + +namespace Implab +{ + class PromiseAll : IResolvable { + int m_count; + + readonly Deferred m_deferred; + + public bool Done { + get { return m_deferred.Promise.IsResolved; } + } + + public IPromise ResultPromise { + get { return m_deferred.Promise; } + } + + public void AddPromise(IPromise promise) { + Interlocked.Increment(ref m_count); + } + + public PromiseAll(Deferred deferred) { + m_deferred = deferred; + } + + public void Resolve() { + if (Interlocked.Decrement(ref m_count) == 0) + m_deferred.Resolve(); + } + + public void Complete() { + Resolve(); + } + + public void Reject(Exception error) { + m_deferred.Reject(error); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseAll`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,90 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace Implab { + class PromiseAll<T> : IResolvable { + + int m_count; + + readonly List<IPromise<T>> m_promises = new List<IPromise<T>>(); + + readonly Deferred<T[]> m_deferred; + + IPromise<T[]> m_result; + + readonly Func<T, IPromise> m_cleanup; + + readonly Action m_cancel; + + public bool Done { + get { return m_deferred.Promise.IsResolved && m_cleanup == null; } + } + + public IPromise<T[]> ResultPromise { + get { return m_result; } + } + + public void AddPromise(IPromise<T> promise) { + Interlocked.Increment(ref m_count); + promise.Then(this); + } + + public PromiseAll(Deferred<T[]> deferred, Func<T, IPromise> cleanup, Action cancel) { + m_deferred = deferred; + m_cancel = cancel; + m_cleanup = cleanup; + } + + public void Resolve() { + if (Interlocked.Decrement(ref m_count) == 0) + m_deferred.Resolve(GetResults()); + } + + public void Reject(Exception error) { + m_deferred.Reject(error); + } + + public void Complete() { + if (m_cancel != null || m_cleanup != null) + m_result = m_deferred.Promise.Catch(CleanupResults); + else + m_result = m_deferred.Promise; + } + + IPromise<T[]> CleanupResults(Exception reason) { + var errors = new List<Exception>(); + errors.Add(reason); + + if (m_cancel != null) + try { + m_cancel(); + } catch (Exception e) { + errors.Add(e); + } + + if (m_cleanup != null) { + return Promise.All( + m_promises.Select(p => p + .Then(m_cleanup, e => { }) + .Catch(e => { + errors.Add(e); + }) + ) + ).Then<T[]>(new Func<T[]>(() => { + throw new AggregateException(errors); + }), (Func<Exception, T[]>)null); + } else { + return Promise.Reject<T[]>(errors.Count > 1 ? new AggregateException(errors) : reason); + } + } + + T[] GetResults() { + var results = new T[m_promises.Count]; + for (var i = 0; i < results.Length; i++) + results[i] = m_promises[i].Join(); + return results; + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseAwaiter.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,62 @@ +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using Implab.Parallels; + +namespace Implab +{ + public struct PromiseAwaiter : INotifyCompletion { + class PromiseEvent : IResolvable { + IDispatcher m_dispatcher; + + Action m_handler; + + public PromiseEvent(Action handler, IDispatcher dispatcher) { + m_handler = handler; + m_dispatcher = dispatcher; + } + + public void Resolve() { + m_dispatcher.Enqueue(m_handler); + } + + public void Reject(Exception error) { + m_dispatcher.Enqueue(m_handler); + } + } + + readonly IPromise m_promise; + readonly IDispatcher m_dispatcher; + + public PromiseAwaiter(IPromise promise, IDispatcher dispatcher) { + m_promise = promise; + m_dispatcher = dispatcher; + } + + public PromiseAwaiter(IPromise promise) { + m_promise = promise; + m_dispatcher = GetDispatcher(); + } + + public void OnCompleted (Action continuation) { + if (m_promise != null) + m_promise.Then(new PromiseEvent(continuation, GetDispatcher())); + } + + public void GetResult() { + m_promise.Join(); + } + + static IDispatcher GetDispatcher() { + if(SynchronizationContext.Current == null) + return ThreadPoolDispatcher.Instance; + return new SyncContextDispatcher(SynchronizationContext.Current); + } + + public bool IsCompleted { + get { + return m_promise.IsResolved; + } + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseAwaiter`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,62 @@ +using System; +using System.Runtime.CompilerServices; +using System.Threading; +using Implab.Parallels; + +namespace Implab { + public struct PromiseAwaiter<T> : INotifyCompletion { + class PromiseEvent : IResolvable<T> { + IDispatcher m_dispatcher; + + Action m_handler; + + public PromiseEvent(Action handler, IDispatcher dispatcher) { + m_handler = handler; + m_dispatcher = dispatcher; + } + + public void Resolve(T result) { + m_dispatcher.Enqueue(m_handler); + } + + public void Reject(Exception error) { + m_dispatcher.Enqueue(m_handler); + } + } + + readonly IPromise<T> m_promise; + + readonly IDispatcher m_dispatcher; + + public PromiseAwaiter(IPromise<T> promise) { + m_promise = promise; + m_dispatcher = GetDispatcher(); + } + + public PromiseAwaiter(IPromise<T> promise, IDispatcher dispatcher) { + m_promise = promise; + m_dispatcher = dispatcher; + } + + public void OnCompleted(Action continuation) { + if (m_promise != null) + m_promise.Then(new PromiseEvent(continuation, GetDispatcher())); + } + + public T GetResult() { + return m_promise.Join(); + } + + static IDispatcher GetDispatcher() { + if (SynchronizationContext.Current == null) + return ThreadPoolDispatcher.Instance; + return new SyncContextDispatcher(SynchronizationContext.Current); + } + + public bool IsCompleted { + get { + return m_promise.IsResolved; + } + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseExecutor.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,3 @@ +namespace Implab { + public delegate void PromiseExecutor(Deferred deferred); +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseExecutor`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,3 @@ +namespace Implab { + public delegate void PromiseExecutor<T>(Deferred<T> d); +} \ No newline at end of file
--- a/Implab/PromiseExtensions.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/PromiseExtensions.cs Tue Jan 30 01:37:17 2018 +0300 @@ -2,474 +2,130 @@ using System; using Implab.Diagnostics; using System.Collections.Generic; -using System.Linq; - +using System.Linq; + namespace Implab { public static class PromiseExtensions { - public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) { - Safe.ArgumentNotNull(that, "that"); - var context = SynchronizationContext.Current; - if (context == null) - return that; - var p = new SyncContextPromise<T>(context); - p.CancellationRequested(that.Cancel); - - that.On( - p.Resolve, - p.Reject, - p.CancelOperation - ); - return p; - } - - public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(context, "context"); - - var p = new SyncContextPromise<T>(context); - p.CancellationRequested(that.Cancel); - - that.On( - p.Resolve, - p.Reject, - p.CancelOperation - ); - return p; - } - - /// <summary> - /// Ensures the dispatched. - /// </summary> - /// <returns>The dispatched.</returns> - /// <param name="that">That.</param> - /// <param name="head">Head.</param> - /// <param name="cleanup">Cleanup.</param> - /// <typeparam name="TPromise">The 1st type parameter.</typeparam> - /// <typeparam name="T">The 2nd type parameter.</typeparam> - public static TPromise EnsureDispatched<TPromise, T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(head, "head"); - - that.On(() => head.On(cleanup), PromiseEventType.Cancelled); - - return that; + public static IPromise Then(this IPromise that, Action fulfilled, Action<Exception> rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - /// <summary> - /// Adds a cancellation point to the chain of promises. When a cancellation request reaches the cancellation point the operation is - /// cancelled immediatelly, and the request is passed towards. If the operation at the higher level can not be cancelled is't result - /// will be collected with <paramref name="cleanup"/> callback. - /// </summary> - /// <typeparam name="T">The type of the promise result.</typeparam> - /// <param name="that">The promise to which the cancellation point should be attached.</param> - /// <param name="cleanup">The callback which is used to cleanup the result of the operation if the cancellation point is cancelled already.</param> - /// <returns>The promise</returns> - public static IPromise<T> CancellationPoint<T>(this IPromise<T> that, Action<T> cleanup) { - var meduim = new Promise<T>(); - - that.On(meduim.Resolve, meduim.Reject, meduim.CancelOperation); - - meduim.CancellationRequested(that.Cancel); - meduim.CancellationRequested(meduim.CancelOperation); - - if (cleanup != null) - meduim.On((Action<T>)null, null, (e) => { - that.On(cleanup); - }); - - return meduim; + public static IPromise Then(this IPromise that, Action fulfilled, Func<Exception, IPromise> rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult, T> callback) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(callback, "callback"); - var op = TraceContext.Instance.CurrentOperation; - return ar => { - TraceContext.Instance.EnterLogicalOperation(op, false); - try { - that.Resolve(callback(ar)); - } catch (Exception err) { - that.Reject(err); - } finally { - TraceContext.Instance.Leave(); - } - }; + public static IPromise Then(this IPromise that, Func<IPromise> fulfilled, Action<Exception> rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - static void CancelByTimeoutCallback(object cookie) { - ((ICancellable)cookie).Cancel(new TimeoutException()); - } - - /// <summary> - /// Cancells promise after the specified timeout is elapsed. - /// </summary> - /// <param name="that">The promise to cancel on timeout.</param> - /// <param name="milliseconds">The timeout in milliseconds.</param> - /// <typeparam name="TPromise">The 1st type parameter.</typeparam> - public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise { - Safe.ArgumentNotNull(that, "that"); - var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1); - that.On(timer.Dispose, PromiseEventType.All); - return that; + public static IPromise Then(this IPromise that, Func<IPromise> fulfilled, Func<Exception, IPromise> rejected) { + var reaction = new PromiseActionReaction(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise PromiseAll(this IEnumerable<IPromise> that) { - Safe.ArgumentNotNull(that, "that"); - return PromiseAll(that.ToList()); - } - - public static IPromise<T[]> PromiseAll<T>(this IEnumerable<IPromise<T>> that) { - return PromiseAll(that, null); - } - - public static IPromise<T[]> PromiseAll<T>(this IEnumerable<IPromise<T>> that, Action<T> cleanup) { - Safe.ArgumentNotNull(that, "that"); - return PromiseAll(that.ToList(), cleanup); + public static IPromise Then<T>(this IPromise<T> that, Action<T> fulfilled, Action<Exception> rejected) { + var reaction = new PromiseActionReaction<T>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise PromiseAll(this ICollection<IPromise> that) { - Safe.ArgumentNotNull(that, "that"); - - int count = that.Count; - int errors = 0; - var medium = new Promise(); - - if (count == 0) { - medium.Resolve(); - return medium; - } - - medium.On(() => { - foreach (var p2 in that) - p2.Cancel(); - }, PromiseEventType.ErrorOrCancel); - - foreach (var p in that) - p.On( - () => { - if (Interlocked.Decrement(ref count) == 0) - medium.Resolve(); - }, - error => { - if (Interlocked.Increment(ref errors) == 1) - medium.Reject( - new Exception("The dependency promise is failed", error) - ); - }, - reason => { - if (Interlocked.Increment(ref errors) == 1) - medium.Cancel( - new Exception("The dependency promise is cancelled") - ); - } - ); - - return medium; - } - - public static IPromise<T[]> PromiseAll<T>(this ICollection<IPromise<T>> that) { - return PromiseAll(that, null); + public static IPromise Then<T>(this IPromise<T> that, Action<T> fulfilled, Func<Exception, IPromise> rejected) { + var reaction = new PromiseActionReaction<T>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - /// <summary> - /// Creates a new promise which will be satisfied when all promises are satisfied. - /// </summary> - /// <typeparam name="T"></typeparam> - /// <param name="that"></param> - /// <param name="cleanup">A callback used to cleanup already resolved promises in case of an error</param> - /// <returns></returns> - public static IPromise<T[]> PromiseAll<T>(this ICollection<IPromise<T>> that, Action<T> cleanup) { - Safe.ArgumentNotNull(that, "that"); - - int count = that.Count; - - if (count == 0) - return Promise<T[]>.FromResult(new T[0]); - - int errors = 0; - var medium = new Promise<T[]>(); - var results = new T[that.Count]; - - medium.On(() => { - foreach (var p2 in that) { - p2.Cancel(); - if (cleanup != null) - p2.On(cleanup); - } - }, PromiseEventType.ErrorOrCancel); - - int i = 0; - foreach (var p in that) { - var idx = i; - p.On( - x => { - results[idx] = x; - if (Interlocked.Decrement(ref count) == 0) - medium.Resolve(results); - }, - error => { - if (Interlocked.Increment(ref errors) == 1) - medium.Reject( - new Exception("The dependency promise is failed", error) - ); - }, - reason => { - if (Interlocked.Increment(ref errors) == 1) - medium.Cancel( - new Exception("The dependency promise is cancelled", reason) - ); - } - ); - i++; - } - - return medium; + public static IPromise Then<T>(this IPromise<T> that, Func<T, IPromise> fulfilled, Action<Exception> rejected) { + var reaction = new PromiseActionReaction<T>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) { - Safe.ArgumentNotNull(that, "that"); + public static IPromise Then<T>(this IPromise<T> that, Func<T, IPromise> fulfilled, Func<Exception, IPromise> rejected) { + var reaction = new PromiseActionReaction<T>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; + } - var d = new ActionTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; + public static IPromise<Tout> Then<Tout>(this IPromise that, Func<Tout> fulfilled, Func<Exception, Tout> rejected) { + var reaction = new PromiseFuncReaction<Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Then(this IPromise that, Action success, Action<Exception> error) { - return Then(that, success, error, null); - } - - public static IPromise Then(this IPromise that, Action success) { - return Then(that, success, null, null); - } - - public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) { - Safe.ArgumentNotNull(that, "that"); - - var d = new FuncTask<T>(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) { - return Then(that, success, error, null); - } - - public static IPromise<T> Then<T>(this IPromise that, Func<T> success) { - return Then(that, success, null, null); + public static IPromise<Tout> Then<Tout>(this IPromise that, Func<Tout> fulfilled, Func<Exception, IPromise<Tout>> rejected) { + var reaction = new PromiseFuncReaction<Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(success, "success"); - - var d = new FuncTask<T, T2>(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success, Func<Exception, T> error, Func<Exception, T> cancel) { - Safe.ArgumentNotNull(that, "that"); - var d = new FuncTask<T, T>( - x => { - success(x); - return x; - }, - error, - cancel, - false - ); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success, Func<Exception, T> error) { - return Then(that, success, error, null); + public static IPromise<Tout> Then<Tout>(this IPromise that, Func<IPromise<Tout>> fulfilled, Func<Exception, Tout> rejected) { + var reaction = new PromiseFuncReaction<Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise<T> Then<T>(this IPromise<T> that, Action<T> success) { - return Then(that, success, null, null); - } - - public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) { - return Then(that, success, error, null); - } - - public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) { - return Then(that, success, null, null); + public static IPromise<Tout> Then<Tout>(this IPromise that, Func<IPromise<Tout>> fulfilled, Func<Exception, IPromise<Tout>> rejected) { + var reaction = new PromiseFuncReaction<Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise<T> Always<T>(this IPromise<T> that, Action handler) { - Func<Exception, T> errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then( - that, - x => { - handler(); - return x; - }, - errorOrCancel, - errorOrCancel); + public static IPromise<Tout> Then<Tin, Tout>(this IPromise<Tin> that, Func<Tin, Tout> fulfilled, Func<Exception, Tout> rejected) { + var reaction = new PromiseFuncReaction<Tin, Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Always(this IPromise that, Action handler) { - Action<Exception> errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then( - that, - handler, - errorOrCancel, - errorOrCancel); - } - - public static IPromise Error(this IPromise that, Action<Exception> handler, bool handleCancellation) { - Action<Exception> errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(e); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null); + public static IPromise<Tout> Then<Tin, Tout>(this IPromise<Tin> that, Func<Tin, Tout> fulfilled, Func<Exception, IPromise<Tout>> rejected) { + var reaction = new PromiseFuncReaction<Tin, Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Error(this IPromise that, Action<Exception> handler) { - return Error(that, handler, false); - } - - public static IPromise<T> Error<T>(this IPromise<T> that, Action<Exception> handler, bool handleCancellation) { - Func<Exception, T> errorOrCancel; - if (handler != null) - errorOrCancel = e => { - handler(e); - throw new PromiseTransientException(e); - }; - else - errorOrCancel = null; - - return Then(that, null, errorOrCancel, handleCancellation ? errorOrCancel : null); + public static IPromise<Tout> Then<Tin, Tout>(this IPromise<Tin> that, Func<Tin, IPromise<Tout>> fulfilled, Func<Exception, Tout> rejected) { + var reaction = new PromiseFuncReaction<Tin, Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise<T> Error<T>(this IPromise<T> that, Action<Exception> handler) { - return Error(that, handler, false); - } - - #region chain traits - public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel) { - Safe.ArgumentNotNull(that, "that"); - - var d = new ActionChainTask(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception, IPromise> error) { - return Chain(that, success, error, null); + public static IPromise<Tout> Then<Tin, Tout>(this IPromise<Tin> that, Func<Tin, IPromise<Tout>> fulfilled, Func<Exception, IPromise<Tout>> rejected) { + var reaction = new PromiseFuncReaction<Tin, Tout>(fulfilled, rejected, Promise.DefaultDispatcher); + that.Then(reaction); + return reaction.Promise; } - public static IPromise Chain(this IPromise that, Func<IPromise> success) { - return Chain(that, success, null, null); - } - - public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) { - Safe.ArgumentNotNull(that, "that"); - - var d = new FuncChainTask<T>(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); - return d; - } - - public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) { - return Chain(that, success, error, null); - } - - public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) { - return Chain(that, success, null, null); + public static IPromise Catch(this IPromise that, Action<Exception> rejected) { + return Then(that, null, rejected); } - public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) { - Safe.ArgumentNotNull(that, "that"); - var d = new FuncChainTask<T, T2>(success, error, cancel, false); - that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); - return d; + public static IPromise Catch(this IPromise that, Func<Exception, IPromise> rejected) { + return Then(that, null, rejected); } - public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) { - return Chain(that, success, error, null); + public static IPromise<Tout> Catch<Tout>(this IPromise that, Func<Exception, Tout> rejected) { + return Then(that, (Func<Tout>)null, rejected); } - public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) { - return Chain(that, success, null, null); - } - - #endregion - - public static IPromise<T2> Guard<T, T2>(this IPromise<T> that, Func<IPromise<T>, IPromise<T2>> continuation, Action<T> cleanup) { - Safe.ArgumentNotNull(that, "that"); - Safe.ArgumentNotNull(continuation, "continuation"); - return continuation(that).Error((err) => { - that.On(cleanup); - }, true); - } - -#if NET_4_5 - - public static PromiseAwaiter<T> GetAwaiter<T>(this IPromise<T> that) { - Safe.ArgumentNotNull(that, "that"); + public static IPromise<Tout> Catch<Tout>(this IPromise that, Func<Exception, IPromise<Tout>> rejected) { + return Then(that, (Func<Tout>)null, rejected); + } - return new PromiseAwaiter<T>(that); - } - - public static PromiseAwaiter GetAwaiter(this IPromise that) { - Safe.ArgumentNotNull(that, "that"); + public static IPromise<Tout> Catch<Tin, Tout>(this IPromise<Tin> that, Func<Exception, Tout> rejected) { + return Then(that, (Func<Tin, Tout>)null, rejected); + } - return new PromiseAwaiter(that); - } - - public static IPromise BoundCancellationToken(this IPromise that, CancellationToken ct) { - Safe.ArgumentNotNull(that, "that"); - ct.Register(that.Cancel); - return that.Then(null, null, (err) => { - ct.ThrowIfCancellationRequested(); - throw new PromiseTransientException(err); - }); - } - - public static IPromise<T> BoundCancellationToken<T>(this IPromise<T> that, CancellationToken ct) { - Safe.ArgumentNotNull(that, "that"); - ct.Register(that.Cancel); - return that.Then(null, null, (err) => { - ct.ThrowIfCancellationRequested(); - throw new PromiseTransientException(err); - }); - } - -#endif + public static IPromise<Tout> Catch<Tin, Tout>(this IPromise<Tin> that, Func<Exception, IPromise<Tout>> rejected) { + return Then(that, (Func<Tin, Tout>)null, rejected); + } } }
--- a/Implab/PromiseFuncReaction`1.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/PromiseFuncReaction`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -3,51 +3,47 @@ namespace Implab { class PromiseFuncReaction<TRet> : PromiseReaction { - readonly Action m_fulfilled; - - readonly Action<Exception> m_rejected; - readonly Deferred<TRet> m_next; - public PromiseFuncReaction(Func<TRet> fulfilled, Func<Exception, TRet> rejected, Deferred<TRet> next, IDispatcher dispatcher) : base(dispatcher) { + public IPromise<TRet> Promise { + get { return m_next.Promise; } + } + + public PromiseFuncReaction(Func<TRet> fulfilled, Func<Exception, TRet> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; - } - - public PromiseFuncReaction(Func<IPromise<TRet>> fulfilled, Func<Exception, IPromise<TRet>> rejected, Deferred<TRet> next, IDispatcher dispatcher) : base(dispatcher) { - if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; - if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseFuncReaction(Func<TRet> fulfilled, Func<Exception, IPromise<TRet>> rejected, Deferred<TRet> next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseFuncReaction(Func<IPromise<TRet>> fulfilled, Func<Exception, IPromise<TRet>> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } - public PromiseFuncReaction(Func<IPromise<TRet>> fulfilled, Func<Exception, TRet> rejected, Deferred<TRet> next, IDispatcher dispatcher) : base(dispatcher) { + public PromiseFuncReaction(Func<TRet> fulfilled, Func<Exception, IPromise<TRet>> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); if (fulfilled != null) - m_fulfilled = () => { next.Resolve(fulfilled()); }; + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + if (rejected != null) - m_rejected = (e) => { next.Resolve(rejected(e)); }; - - m_next = next; + RejectHandler = PromiseHandler.Create(rejected, m_next); } + public PromiseFuncReaction(Func<IPromise<TRet>> fulfilled, Func<Exception, TRet> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); - protected override bool HasFulfilHandler => m_fulfilled != null; - - protected override bool HasRejectHandler => m_rejected != null; + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } protected override void DefaultReject(Exception reason) { m_next.Reject(reason); @@ -56,21 +52,5 @@ protected override void DefaultResolve() { throw new NotImplementedException(); } - - protected override void RejectImpl(Exception reason) { - try { - m_rejected(reason); - } catch (Exception e){ - m_next.Reject(e); - } - } - - protected override void ResolveImpl() { - try { - m_fulfilled(); - } catch (Exception e){ - m_next.Reject(e); - } - } } } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseFuncReaction`2.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,56 @@ +using System; +using System.Diagnostics; + +namespace Implab { + class PromiseFuncReaction<TIn, TRet> : PromiseReaction<TIn> { + readonly Deferred<TRet> m_next; + + public IPromise<TRet> Promise { + get { return m_next.Promise; } + } + + public PromiseFuncReaction(Func<TIn, TRet> fulfilled, Func<Exception, TRet> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + public PromiseFuncReaction(Func<TIn, IPromise<TRet>> fulfilled, Func<Exception, IPromise<TRet>> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + public PromiseFuncReaction(Func<TIn, TRet> fulfilled, Func<Exception, IPromise<TRet>> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + public PromiseFuncReaction(Func<TIn, IPromise<TRet>> fulfilled, Func<Exception, TRet> rejected, IDispatcher dispatcher) : base(dispatcher) { + m_next = new Deferred<TRet>(dispatcher); + if (fulfilled != null) + FulfilHandler = PromiseHandler.Create(fulfilled, m_next); + + if (rejected != null) + RejectHandler = PromiseHandler.Create(rejected, m_next); + } + + protected override void DefaultReject(Exception reason) { + m_next.Reject(reason); + } + + protected override void DefaultResolve(TIn result) { + m_next.Resolve((TRet)(object)result); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseHandler.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,102 @@ +using System; +using System.Diagnostics; + +namespace Implab { + class PromiseHandler { + public static Action<T> Create<T>(Action<T> handler, Deferred next) { + Debug.Assert(handler != null); + + return (v) => { + try { + handler(v); + next.Resolve(); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action<T> Create<T>(Func<T, IPromise> handler, Deferred next) { + Debug.Assert(handler != null); + + return (v) => { + try { + next.Resolve(handler(v)); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action<T> Create<T, T2>(Func<T, T2> handler, Deferred<T2> next) { + Debug.Assert(handler != null); + + return (v) => { + try { + next.Resolve(handler(v)); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action<T> Create<T, T2>(Func<T, IPromise<T2>> handler, Deferred<T2> next) { + Debug.Assert(handler != null); + return (v) => { + try { + next.Resolve(handler(v)); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Action handler, Deferred next) { + Debug.Assert(handler != null); + + return () => { + try { + handler(); + next.Resolve(); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create(Func<IPromise> handler, Deferred next) { + Debug.Assert(handler != null); + + return () => { + try { + next.Resolve(handler()); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create<T2>(Func<T2> handler, Deferred<T2> next) { + Debug.Assert(handler != null); + + return () => { + try { + next.Resolve(handler()); + } catch (Exception err) { + next.Reject(err); + } + }; + } + + public static Action Create<T2>(Func<IPromise<T2>> handler, Deferred<T2> next) { + Debug.Assert(handler != null); + return () => { + try { + next.Resolve(handler()); + } catch (Exception err) { + next.Reject(err); + } + }; + } + } +} \ No newline at end of file
--- a/Implab/PromiseReaction.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/PromiseReaction.cs Tue Jan 30 01:37:17 2018 +0300 @@ -1,6 +1,11 @@ using System; namespace Implab { + /// <summary> + /// Базовыйй класс для создания обработчиков результов выполнения обещаний. + /// Данный объект связывает обработчик и обешание, при этом для выполнения + /// обработчика будет использоваться диспетчер. + /// </summary> abstract class PromiseReaction : IResolvable { readonly IDispatcher m_dispatcher; @@ -8,36 +13,28 @@ m_dispatcher = dispatcher; } - protected abstract bool HasFulfilHandler { - get; - } + protected Action FulfilHandler { get; set; } - protected abstract bool HasRejectHandler { - get; - } + protected Action<Exception> RejectHandler { get; set; } public void Reject(Exception error) { - if (!HasRejectHandler) + if (RejectHandler == null) DefaultReject(error); else if (m_dispatcher != null) - m_dispatcher.Enqueue(() => RejectImpl(error)); + m_dispatcher.Enqueue(RejectHandler, error); else - RejectImpl(error); + RejectHandler(error); } public void Resolve() { - if (!HasFulfilHandler) + if (FulfilHandler == null) DefaultResolve(); else if (m_dispatcher != null) - m_dispatcher.Enqueue(ResolveImpl); + m_dispatcher.Enqueue(FulfilHandler); else - ResolveImpl(); + FulfilHandler(); } - protected abstract void ResolveImpl(); - - protected abstract void RejectImpl(Exception reason); - protected abstract void DefaultResolve(); protected abstract void DefaultReject(Exception reason);
--- a/Implab/PromiseReaction`1.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/PromiseReaction`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -1,6 +1,11 @@ using System; namespace Implab { + /// <summary> + /// Базовыйй класс для создания обработчиков результов выполнения обещаний. + /// Данный объект связывает обработчик и обешание, при этом для выполнения + /// обработчика будет использоваться диспетчер. + /// </summary> abstract class PromiseReaction<T> : IResolvable<T> { readonly IDispatcher m_dispatcher; @@ -8,36 +13,28 @@ m_dispatcher = dispatcher; } - protected abstract bool HasFulfilHandler { - get; - } + protected Action<T> FulfilHandler { get; set; } - protected abstract bool HasRejectHandler { - get; - } + protected Action<Exception> RejectHandler { get; set; } public void Reject(Exception error) { - if (!HasRejectHandler) + if (RejectHandler == null) DefaultReject(error); else if (m_dispatcher != null) - m_dispatcher.Enqueue(() => RejectImpl(error)); + m_dispatcher.Enqueue(RejectHandler, error); else - RejectImpl(error); + RejectHandler(error); } public void Resolve(T result) { - if (!HasFulfilHandler) + if (FulfilHandler == null) DefaultResolve(result); else if (m_dispatcher != null) - m_dispatcher.Enqueue(() => ResolveImpl(result)); + m_dispatcher.Enqueue(FulfilHandler, result); else - ResolveImpl(result); + FulfilHandler(result); } - protected abstract void ResolveImpl(T result); - - protected abstract void RejectImpl(Exception reason); - protected abstract void DefaultResolve(T result); protected abstract void DefaultReject(Exception reason);
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Promise`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,159 @@ +using System; +using System.Diagnostics; +using System.Reflection; +using Implab.Parallels; + +namespace Implab { + public class Promise<T> : AbstractEvent<IResolvable<T>>, IPromise<T> { + + class ResolvableSignal : IResolvable<T> { + public Signal Signal { get; private set; } + public ResolvableSignal() { + Signal = new Signal(); + } + + + public void Reject(Exception error) { + Signal.Set(); + } + + public void Resolve(T result) { + Signal.Set(); + } + } + + class ResolvableWrapper : IResolvable<T> { + readonly IResolvable m_resolvable; + public ResolvableWrapper(IResolvable resolvable) { + m_resolvable = resolvable; + } + + public void Reject(Exception reason) { + m_resolvable.Reject(reason); + } + + public void Resolve(T value) { + m_resolvable.Resolve(); + } + } + + PromiseState m_state; + + T m_result; + + Exception m_error; + + public bool IsRejected { + get { + return m_state == PromiseState.Rejected; + } + } + + public bool IsFulfilled { + get { + return m_state == PromiseState.Fulfilled; + } + } + + public Exception RejectReason { + get { + return m_error; + } + } + + + internal void ResolvePromise(T result) { + if (BeginTransit()) { + m_result = result; + m_state = PromiseState.Fulfilled; + CompleteTransit(); + } + } + + internal void RejectPromise(Exception reason) { + if (BeginTransit()) { + m_error = reason; + m_state = PromiseState.Rejected; + CompleteTransit(); + } + } + + + #region implemented abstract members of AbstractPromise + + protected override void SignalHandler(IResolvable<T> handler) { + switch (m_state) { + case PromiseState.Fulfilled: + handler.Resolve(m_result); + break; + case PromiseState.Rejected: + handler.Reject(RejectReason); + break; + default: + throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); + } + } + + protected void WaitResult(int timeout) { + if (!(IsResolved || GetFulfillSignal().Wait(timeout))) + throw new TimeoutException(); + } + + protected Signal GetFulfillSignal() { + var next = new ResolvableSignal(); + Then(next); + return next.Signal; + } + + #endregion + + public Type ResultType { + get { + return typeof(void); + } + } + + + protected void Rethrow() { + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); + } + + public void Then(IResolvable<T> next) { + AddHandler(next); + } + + public void Then(IResolvable next) { + AddHandler(new ResolvableWrapper(next)); + } + + public IPromise<T2> Cast<T2>() { + return (IPromise<T2>)this; + } + + void IPromise.Join() { + Join(); + } + + void IPromise.Join(int timeout) { + Join(timeout); + } + + public T Join() { + WaitResult(-1); + if (IsRejected) + Rethrow(); + return m_result; + } + + public T Join(int timeout) { + WaitResult(timeout); + if (IsRejected) + Rethrow(); + return m_result; + } + } +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/RejectedPromise.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,38 @@ +using System; + +namespace Implab +{ + public struct RejectedPromise : IPromise { + readonly Exception m_reason; + + public Type ResultType => typeof(void); + + public bool IsResolved => true; + + public bool IsRejected => true; + + public bool IsFulfilled => false; + + public Exception RejectReason => m_reason; + + public RejectedPromise(Exception reason) { + m_reason = reason; + } + + public IPromise<T> Cast<T>() { + throw new InvalidCastException(); + } + + public void Join() { + m_reason.ThrowInvocationException(); + } + + public void Join(int timeout) { + m_reason.ThrowInvocationException(); + } + + public void Then(IResolvable next) { + next.Reject(m_reason); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/RejectedPromise`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,52 @@ +using System; + +namespace Implab +{ + public struct RejectedPromise<T> : IPromise<T> { + readonly Exception m_reason; + + public Type ResultType => typeof(void); + + public bool IsResolved => true; + + public bool IsRejected => true; + + public bool IsFulfilled => false; + + public Exception RejectReason => m_reason; + + public RejectedPromise(Exception reason) { + m_reason = reason; + } + + public IPromise<T2> Cast<T2>() { + return (IPromise<T2>)(IPromise<T>)this; + } + + void IPromise.Join() { + m_reason.ThrowInvocationException(); + } + + void IPromise.Join(int timeout) { + m_reason.ThrowInvocationException(); + } + + public T Join() { + m_reason.ThrowInvocationException(); + throw new Exception(); // unreachable code + } + + public T Join(int timeout) { + m_reason.ThrowInvocationException(); + throw new Exception(); // unreachable code + } + + public void Then(IResolvable next) { + next.Reject(m_reason); + } + + public void Then(IResolvable<T> next) { + next.Reject(m_reason); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ResolvedPromise.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,30 @@ +using System; + +namespace Implab +{ + public struct ResolvedPromise : IPromise { + public Type ResultType => typeof(void); + + public bool IsResolved => true; + + public bool IsRejected => false; + + public bool IsFulfilled => true; + + public Exception RejectReason => null; + + public IPromise<T> Cast<T>() { + throw new InvalidCastException(); + } + + public void Join() { + } + + public void Join(int timeout) { + } + + public void Then(IResolvable next) { + next.Resolve(); + } + } +} \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ResolvedPromise`1.cs Tue Jan 30 01:37:17 2018 +0300 @@ -0,0 +1,47 @@ +using System; + +namespace Implab { + public struct ResolvedPromise<T> : IPromise<T> { + T m_result; + + public Type ResultType => typeof(T); + + public bool IsResolved => true; + + public bool IsRejected => false; + + public bool IsFulfilled => true; + + public Exception RejectReason => null; + + public ResolvedPromise(T result) { + m_result = result; + } + + public IPromise<T2> Cast<T2>() { + return (IPromise<T2>)(IPromise<T>)this; + } + + void IPromise.Join() { + } + + void IPromise.Join(int timeout) { + } + + public T Join() { + return m_result; + } + + public T Join(int timeout) { + return m_result; + } + + public void Then(IResolvable<T> next) { + next.Resolve(m_result); + } + + public void Then(IResolvable next) { + next.Resolve(); + } + } +} \ No newline at end of file
--- a/Implab/Safe.cs Fri Jan 26 18:46:27 2018 +0300 +++ b/Implab/Safe.cs Tue Jan 30 01:37:17 2018 +0300 @@ -112,9 +112,9 @@ ArgumentNotNull(action, "action"); try { - return Promise<T>.FromResult(action()); + return Promise.Resolve(action()); } catch (Exception err) { - return Promise<T>.FromException(err); + return Promise.Reject<T>(err); } } @@ -124,9 +124,9 @@ try { action(); - return Promise.Success; + return Promise.Resolve(); } catch (Exception err) { - return new FailedPromise(err); + return Promise.Reject(err); } } @@ -135,9 +135,9 @@ ArgumentNotNull(action, "action"); try { - return action() ?? new FailedPromise(new Exception("The action returned null")); + return action() ?? Promise.Reject(new Exception("The action returned null")); } catch (Exception err) { - return new FailedPromise(err); + return Promise.Reject(err); } } @@ -149,9 +149,9 @@ ArgumentNotNull(action, "action"); try { - return action() ?? Promise<T>.FromException(new Exception("The action returned null")); + return action() ?? Promise.Reject<T>(new Exception("The action returned null")); } catch (Exception err) { - return Promise<T>.FromException(err); + return Promise.Reject<T>(err); } }