Mercurial > pub > ImplabNet
diff Implab/AbstractEvent.cs @ 242:cbe10ac0731e v3
Working on promises
author | cin |
---|---|
date | Wed, 24 Jan 2018 03:03:21 +0300 |
parents | fa6cbf4d8841 |
children | b1e0ffdf3451 |
line wrap: on
line diff
--- a/Implab/AbstractEvent.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/AbstractEvent.cs Wed Jan 24 03:03:21 2018 +0300 @@ -2,56 +2,50 @@ using Implab.Parallels; using System.Threading; using System.Reflection; +using System.Diagnostics; namespace Implab { - public abstract class AbstractEvent<THandler> : ICancellable { + public abstract class AbstractEvent<THandler> where THandler : class { - const int UNRESOLVED_SATE = 0; - const int TRANSITIONAL_STATE = 1; + const int PENDING_SATE = 0; + protected const int TRANSITIONAL_STATE = 1; + protected const int SUCCEEDED_STATE = 2; protected const int REJECTED_STATE = 3; - protected const int CANCELLED_STATE = 4; - const int CANCEL_NOT_REQUESTED = 0; - const int CANCEL_REQUESTING = 1; - const int CANCEL_REQUESTED = 2; - - const int RESERVED_HANDLERS_COUNT = 4; - - int m_state; + volatile int m_state; Exception m_error; - int m_handlersCount; - //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - THandler[] m_handlers; + THandler m_handler; SimpleAsyncQueue<THandler> m_extraHandlers; - int m_handlerPointer = -1; - int m_handlersCommited; - - int m_cancelRequest; - Exception m_cancelationReason; #region state managment - bool BeginTransit() { - return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + protected bool BeginTransit() { + return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); } - void CompleteTransit(int state) { + protected void CompleteTransit(int state) { +#if DEBUG if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); +#else + m_state = state; +#endif + Signal(); } - void WaitTransition() { - while (m_state == TRANSITIONAL_STATE) { - Thread.MemoryBarrier(); + protected void WaitTransition() { + if (m_state == TRANSITIONAL_STATE) { + SpinWait spin; + do { + spin.SpinOnce(); + } while (m_state == TRANSITIONAL_STATE); } } protected bool BeginSetResult() { if (!BeginTransit()) { WaitTransition(); - if (m_state != CANCELLED_STATE) - throw new InvalidOperationException("The promise is already resolved"); return false; } return true; @@ -59,7 +53,6 @@ protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); - Signal(); } @@ -78,8 +71,6 @@ if (BeginTransit()) { m_error = error; CompleteTransit(REJECTED_STATE); - - Signal(); } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) @@ -87,58 +78,33 @@ } } - /// <summary> - /// Отменяет операцию, если это возможно. - /// </summary> - /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> - protected void SetCancelled(Exception reason) { - if (BeginTransit()) { - m_error = reason; - CompleteTransit(CANCELLED_STATE); - Signal(); - } - } - protected abstract void SignalHandler(THandler handler, int signal); void Signal() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalHandler(m_handlers[slot], m_state); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalHandler(handler, m_state); - } + THandler handler; + while (TryDequeueHandler(out handler)) + SignalHandler(handler, m_state); } #endregion - protected abstract Signal GetResolveSignal(); + protected abstract Signal GetFulfillSignal(); #region synchronization traits protected void WaitResult(int timeout) { - if (!(IsResolved || GetResolveSignal().Wait(timeout))) + if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) throw new TimeoutException(); - switch (m_state) { - case SUCCEEDED_STATE: - return; - case CANCELLED_STATE: - throw new OperationCanceledException("The operation has been cancelled", m_error); - case REJECTED_STATE: - throw new TargetInvocationException(m_error); - default: - throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); - } + if (IsRejected) + Rethrow(); + } + + protected void Rethrow() { + Debug.Assert(m_error != null); + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); } #endregion @@ -150,149 +116,55 @@ // the promise is in the resolved state, just invoke the handler SignalHandler(handler, m_state); } else { - var slot = Interlocked.Increment(ref m_handlersCount) - 1; - - if (slot < RESERVED_HANDLERS_COUNT) { - - if (slot == 0) { - m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - } else { - while (m_handlers == null) - Thread.MemoryBarrier(); - } - - m_handlers[slot] = handler; - - while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { - } + if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { + if (m_extraHandlers == null) + // compare-exchange will fprotect from loosing already created queue + Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); + m_extraHandlers.Enqueue(handler); + } - if (m_state > 1) { - do { - var hp = m_handlerPointer; - slot = hp + 1; - if (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) - continue; - SignalHandler(m_handlers[slot], m_state); - } - break; - } while(true); - } - } else { - if (slot == RESERVED_HANDLERS_COUNT) { - m_extraHandlers = new SimpleAsyncQueue<THandler>(); - } else { - while (m_extraHandlers == null) - Thread.MemoryBarrier(); - } - - m_extraHandlers.Enqueue(handler); - - if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) + if (m_state > 1 && TryDequeueHandler(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) - SignalHandler(handler, m_state); - } + SignalHandler(handler, m_state); } + + } + + bool TryDequeueHandler(out THandler handler) { + handler = Interlocked.Exchange(ref m_handler, null); + if (handler != null) + return true; + return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler); } #endregion #region IPromise implementation - public bool IsResolved { + public bool IsFulfilled { get { - Thread.MemoryBarrier(); - return m_state > 1; + return m_state > TRANSITIONAL_STATE; } } - public bool IsCancelled { + public bool IsRejected { get { - Thread.MemoryBarrier(); - return m_state == CANCELLED_STATE; + return m_state == REJECTED_STATE; } } #endregion - public Exception Error { + public Exception RejectReason { get { return m_error; } } - public bool CancelOperationIfRequested() { - if (IsCancellationRequested) { - CancelOperation(CancellationReason); - return true; - } - return false; - } - - public virtual void CancelOperation(Exception reason) { - SetCancelled(reason); - } - - public void CancellationRequested(Action<Exception> handler) { - Safe.ArgumentNotNull(handler, "handler"); - if (IsCancellationRequested) - handler(CancellationReason); - - if (m_cancelationHandlers == null) - Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null); - - m_cancelationHandlers.Enqueue(handler); - - if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) - // TryDeque implies MemoryBarrier() - handler(m_cancelationReason); - } - - public bool IsCancellationRequested { - get { - do { - if (m_cancelRequest == CANCEL_NOT_REQUESTED) - return false; - if (m_cancelRequest == CANCEL_REQUESTED) - return true; - Thread.MemoryBarrier(); - } while(true); - } - } - - public Exception CancellationReason { - get { - do { - Thread.MemoryBarrier(); - } while(m_cancelRequest == CANCEL_REQUESTING); - - return m_cancelationReason; - } - } - - #region ICancellable implementation - - public void Cancel() { - Cancel(null); - } - - public void Cancel(Exception reason) { - if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { - m_cancelationReason = reason; - m_cancelRequest = CANCEL_REQUESTED; - if (m_cancelationHandlers != null) { - Action<Exception> handler; - while (m_cancelationHandlers.TryDequeue(out handler)) - handler(m_cancelationReason); - } - } - } - - #endregion } }