Mercurial > pub > ImplabNet
changeset 243:b1e0ffdf3451 v3
working on promises
author | cin |
---|---|
date | Wed, 24 Jan 2018 19:24:10 +0300 (2018-01-24) |
parents | cbe10ac0731e |
children | eee3e49dd1ff |
files | Implab/AbstractEvent.cs Implab/AbstractPromise.cs Implab/IPromise.cs Implab/Promise.cs Implab/PromiseState.cs |
diffstat | 5 files changed, 158 insertions(+), 119 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab/AbstractEvent.cs Wed Jan 24 03:03:21 2018 +0300 +++ b/Implab/AbstractEvent.cs Wed Jan 24 19:24:10 2018 +0300 @@ -5,28 +5,61 @@ using System.Diagnostics; namespace Implab { + /// <summary> + /// Abstract class for creation of custom one-shot thread safe events. + /// </summary> + /// <remarks> + /// <para> + /// An event is something that should happen in the future and the + /// triggering of the event causes execution of some pending actions + /// which are formely event handlers. One-shot events occur only once + /// and any handler added after the event is triggered should run + /// without a delay. + /// </para> + /// <para> + /// The lifecycle of the one-shot event is tipically consists of following + /// phases. + /// <list> + /// <description>Pending state. This is the initial state of the event. Any + /// handler added to the event will be queued for the future execution. + /// </description> + /// <description>Transitional state. This is intermediate state between pending + /// and fulfilled states, during this state internal initialization and storing + /// of the result occurs. + /// </description> + /// <description>Fulfilled state. The event contains the result, all queued + /// handlers are signalled to run and newly added handlers are executed + /// immediatelly. + /// </description> + /// </list> + /// </para> + /// </remarks> public abstract class AbstractEvent<THandler> where THandler : class { - const int PENDING_SATE = 0; - protected const int TRANSITIONAL_STATE = 1; - protected const int SUCCEEDED_STATE = 2; - protected const int REJECTED_STATE = 3; + const int TRANSITIONAL_STATE = 1; + + const int FULFILLED_STATE = 2; volatile int m_state; - Exception m_error; THandler m_handler; SimpleAsyncQueue<THandler> m_extraHandlers; + public bool IsFulfilled { + get { + return m_state > TRANSITIONAL_STATE; + } + } + #region state managment protected bool BeginTransit() { return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); } - protected void CompleteTransit(int state) { + protected void CompleteTransit() { #if DEBUG - if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) + if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, FULFILLED_STATE, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); #else m_state = state; @@ -43,47 +76,13 @@ } } - protected bool BeginSetResult() { - if (!BeginTransit()) { - WaitTransition(); - return false; - } - return true; - } - protected void EndSetResult() { - CompleteTransit(SUCCEEDED_STATE); - } - - - - /// <summary> - /// Выполняет обещание, сообщая об ошибке - /// </summary> - /// <remarks> - /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков - /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные - /// будут проигнорированы. - /// </remarks> - /// <param name="error">Исключение возникшее при выполнении операции</param> - /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> - protected void SetError(Exception error) { - if (BeginTransit()) { - m_error = error; - CompleteTransit(REJECTED_STATE); - } else { - WaitTransition(); - if (m_state == SUCCEEDED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - } - } - - protected abstract void SignalHandler(THandler handler, int signal); + protected abstract void SignalHandler(THandler handler); void Signal() { THandler handler; while (TryDequeueHandler(out handler)) - SignalHandler(handler, m_state); + SignalHandler(handler); } #endregion @@ -94,46 +93,41 @@ protected void WaitResult(int timeout) { if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) throw new TimeoutException(); - - if (IsRejected) - Rethrow(); } - protected void Rethrow() { - Debug.Assert(m_error != null); - if (m_error is OperationCanceledException) - throw new OperationCanceledException("Operation cancelled", m_error); - else - throw new TargetInvocationException(m_error); - } + #endregion #region handlers managment protected void AddHandler(THandler handler) { - if (m_state > 1) { + if (IsFulfilled) { // the promise is in the resolved state, just invoke the handler - SignalHandler(handler, m_state); + SignalHandler(handler); } else { - if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { - if (m_extraHandlers == null) - // compare-exchange will fprotect from loosing already created queue - Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); - m_extraHandlers.Enqueue(handler); - } + EnqueueHandler(handler); - if (m_state > 1 && TryDequeueHandler(out handler)) + if (IsFulfilled && TryDequeueHandler(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) - SignalHandler(handler, m_state); + SignalHandler(handler); } } + void EnqueueHandler(THandler handler) { + if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { + if (m_extraHandlers == null) + // compare-exchange will protect from loosing already created queue + Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); + m_extraHandlers.Enqueue(handler); + } + } + bool TryDequeueHandler(out THandler handler) { handler = Interlocked.Exchange(ref m_handler, null); if (handler != null) @@ -142,29 +136,6 @@ } #endregion - - #region IPromise implementation - - public bool IsFulfilled { - get { - return m_state > TRANSITIONAL_STATE; - } - } - - public bool IsRejected { - get { - return m_state == REJECTED_STATE; - } - } - - #endregion - - public Exception RejectReason { - get { - return m_error; - } - } - } }
--- a/Implab/AbstractPromise.cs Wed Jan 24 03:03:21 2018 +0300 +++ b/Implab/AbstractPromise.cs Wed Jan 24 19:24:10 2018 +0300 @@ -1,50 +1,77 @@ using System; +using System.Diagnostics; +using System.Reflection; using Implab.Parallels; namespace Implab { public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise { public class HandlerDescriptor { - readonly Action m_handler; - readonly Action<Exception> m_error; + readonly Action m_resolve; + readonly Action<Exception> m_reject; + + readonly IDeferred m_deferred; public HandlerDescriptor(Action success, Action<Exception> error) { - m_handler = success; - m_error = error; + m_resolve = success; + m_reject = error; } public void SignalSuccess() { - if (m_handler != null) { - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } + try { + if (m_resolve != null) + m_resolve(); + m_deferred.Resolve(); + } catch (Exception ex) { + m_deferred.Reject(ex); } } public void SignalError(Exception err) { - if (m_error != null) { + if (m_reject != null) { try { - m_error(err); - // Analysis disable once EmptyGeneralCatchClause - } catch { + m_reject(err); + m_deferred.Resolve(); + } catch (Exception ex) { + m_deferred.Reject(ex); } } } } + PromiseState m_state; + + Exception m_error; + + public bool IsRejected { + get { + return m_state == PromiseState.Rejected; + } + } + + public bool IsResolved { + get { + return m_state == PromiseState.Resolved; + } + } + + public Exception RejectReason { + get { + return m_error; + } + } + #region implemented abstract members of AbstractPromise - protected override void SignalHandler(HandlerDescriptor handler, int signal) { - switch (signal) { - case SUCCEEDED_STATE: + protected override void SignalHandler(HandlerDescriptor handler) { + switch (m_state) { + case PromiseState.Resolved: handler.SignalSuccess(); break; - case REJECTED_STATE: + case PromiseState.Rejected: handler.SignalError(RejectReason); break; default: - throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal)); + throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", m_state)); } } @@ -56,12 +83,47 @@ #endregion + protected void CompleteResolve() { + m_state = PromiseState.Resolved; + CompleteTransit(); + } + public Type ResultType { get { return typeof(void); } } + /// <summary> + /// Выполняет обещание, сообщая об ошибке + /// </summary> + /// <remarks> + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// </remarks> + /// <param name="error">Исключение возникшее при выполнении операции</param> + /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> + protected void SetError(Exception error) { + if (BeginTransit()) { + m_error = error; + m_state = PromiseState.Rejected; + CompleteTransit(); + } else { + WaitTransition(); + if (m_state == PromiseState.Resolved) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + protected void Rethrow() { + Debug.Assert(m_error != null); + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); + } + public void On(Action success, Action<Exception> error) { AddHandler(new HandlerDescriptor(success, error)); } @@ -72,16 +134,13 @@ public void Join() { WaitResult(-1); + if (IsRejected) + Rethrow(); } public void Join(int timeout) { WaitResult(timeout); } - - protected void SetResult() { - if(BeginSetResult()) - EndSetResult(); - } } }
--- a/Implab/IPromise.cs Wed Jan 24 03:03:21 2018 +0300 +++ b/Implab/IPromise.cs Wed Jan 24 19:24:10 2018 +0300 @@ -4,7 +4,7 @@ using System.Text; namespace Implab { - public interface IPromise: ICancellable { + public interface IPromise { /// <summary> /// Тип результата, получаемого через данное обещание.
--- a/Implab/Promise.cs Wed Jan 24 03:03:21 2018 +0300 +++ b/Implab/Promise.cs Wed Jan 24 19:24:10 2018 +0300 @@ -2,24 +2,24 @@ using Implab.Parallels; namespace Implab { - public class Promise : AbstractPromise, IDeferred { + public class Promise : AbstractPromise { public static readonly IPromise Success; static Promise() { Success = new SuccessPromise(); } - public void Resolve() { + internal void ResolvePromise() { SetResult(); } - public void Reject(Exception error) { + internal void RejectPromise(Exception error) { SetError(error); - } - - public static IPromise FromException(Exception exception) { - return new FailedPromise(exception); - } + } + + public static IPromise Reject(Exception exception) { + return new FailedPromise(exception); + } } }