Mercurial > pub > ImplabNet
view Implab/AbstractEvent.cs @ 242:cbe10ac0731e v3
Working on promises
author | cin |
---|---|
date | Wed, 24 Jan 2018 03:03:21 +0300 |
parents | fa6cbf4d8841 |
children | b1e0ffdf3451 |
line wrap: on
line source
using System; using Implab.Parallels; using System.Threading; using System.Reflection; using System.Diagnostics; namespace Implab { public abstract class AbstractEvent<THandler> where THandler : class { const int PENDING_SATE = 0; protected const int TRANSITIONAL_STATE = 1; protected const int SUCCEEDED_STATE = 2; protected const int REJECTED_STATE = 3; volatile int m_state; Exception m_error; THandler m_handler; SimpleAsyncQueue<THandler> m_extraHandlers; #region state managment protected bool BeginTransit() { return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); } protected void CompleteTransit(int state) { #if DEBUG if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); #else m_state = state; #endif Signal(); } protected void WaitTransition() { if (m_state == TRANSITIONAL_STATE) { SpinWait spin; do { spin.SpinOnce(); } while (m_state == TRANSITIONAL_STATE); } } protected bool BeginSetResult() { if (!BeginTransit()) { WaitTransition(); return false; } return true; } protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); } /// <summary> /// Выполняет обещание, сообщая об ошибке /// </summary> /// <remarks> /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные /// будут проигнорированы. /// </remarks> /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> protected void SetError(Exception error) { if (BeginTransit()) { m_error = error; CompleteTransit(REJECTED_STATE); } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) throw new InvalidOperationException("The promise is already resolved"); } } protected abstract void SignalHandler(THandler handler, int signal); void Signal() { THandler handler; while (TryDequeueHandler(out handler)) SignalHandler(handler, m_state); } #endregion protected abstract Signal GetFulfillSignal(); #region synchronization traits protected void WaitResult(int timeout) { if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) throw new TimeoutException(); if (IsRejected) Rethrow(); } protected void Rethrow() { Debug.Assert(m_error != null); if (m_error is OperationCanceledException) throw new OperationCanceledException("Operation cancelled", m_error); else throw new TargetInvocationException(m_error); } #endregion #region handlers managment protected void AddHandler(THandler handler) { if (m_state > 1) { // the promise is in the resolved state, just invoke the handler SignalHandler(handler, m_state); } else { if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { if (m_extraHandlers == null) // compare-exchange will fprotect from loosing already created queue Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); m_extraHandlers.Enqueue(handler); } if (m_state > 1 && TryDequeueHandler(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) SignalHandler(handler, m_state); } } bool TryDequeueHandler(out THandler handler) { handler = Interlocked.Exchange(ref m_handler, null); if (handler != null) return true; return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler); } #endregion #region IPromise implementation public bool IsFulfilled { get { return m_state > TRANSITIONAL_STATE; } } public bool IsRejected { get { return m_state == REJECTED_STATE; } } #endregion public Exception RejectReason { get { return m_error; } } } }