Mercurial > pub > ImplabNet
view Implab/AbstractEvent.cs @ 233:d6fe09f5592c v2
Improved AsyncQueue
Removed ImplabFx
author | cin |
---|---|
date | Wed, 04 Oct 2017 15:44:47 +0300 |
parents | 75103928da09 |
children | fa6cbf4d8841 |
line wrap: on
line source
using System; using Implab.Parallels; using System.Threading; using System.Reflection; namespace Implab { public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { const int UNRESOLVED_SATE = 0; const int TRANSITIONAL_STATE = 1; protected const int SUCCEEDED_STATE = 2; protected const int REJECTED_STATE = 3; protected const int CANCELLED_STATE = 4; const int CANCEL_NOT_REQUESTED = 0; const int CANCEL_REQUESTING = 1; const int CANCEL_REQUESTED = 2; const int RESERVED_HANDLERS_COUNT = 4; int m_state; Exception m_error; int m_handlersCount; //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; THandler[] m_handlers; SimpleAsyncQueue<THandler> m_extraHandlers; int m_handlerPointer = -1; int m_handlersCommited; int m_cancelRequest; Exception m_cancelationReason; SimpleAsyncQueue<Action<Exception>> m_cancelationHandlers; #region state managment bool BeginTransit() { return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); } void CompleteTransit(int state) { if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); } void WaitTransition() { while (m_state == TRANSITIONAL_STATE) { Thread.MemoryBarrier(); } } protected bool BeginSetResult() { if (!BeginTransit()) { WaitTransition(); if (m_state != CANCELLED_STATE) throw new InvalidOperationException("The promise is already resolved"); return false; } return true; } protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); Signal(); } /// <summary> /// Выполняет обещание, сообщая об ошибке /// </summary> /// <remarks> /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные /// будут проигнорированы. /// </remarks> /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> protected void SetError(Exception error) { if (BeginTransit()) { m_error = error; CompleteTransit(REJECTED_STATE); Signal(); } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) throw new InvalidOperationException("The promise is already resolved"); } } /// <summary> /// Отменяет операцию, если это возможно. /// </summary> /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> protected void SetCancelled(Exception reason) { if (BeginTransit()) { m_error = reason; CompleteTransit(CANCELLED_STATE); Signal(); } } protected abstract void SignalHandler(THandler handler, int signal); void Signal() { var hp = m_handlerPointer; var slot = hp +1 ; while (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { SignalHandler(m_handlers[slot], m_state); } hp = m_handlerPointer; slot = hp +1 ; } if (m_extraHandlers != null) { THandler handler; while (m_extraHandlers.TryDequeue(out handler)) SignalHandler(handler, m_state); } } #endregion protected abstract Signal GetResolveSignal(); #region synchronization traits protected void WaitResult(int timeout) { if (!(IsResolved || GetResolveSignal().Wait(timeout))) throw new TimeoutException(); switch (m_state) { case SUCCEEDED_STATE: return; case CANCELLED_STATE: throw new OperationCanceledException("The operation has been cancelled", m_error); case REJECTED_STATE: throw new TargetInvocationException(m_error); default: throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); } } #endregion #region handlers managment protected void AddHandler(THandler handler) { if (m_state > 1) { // the promise is in the resolved state, just invoke the handler SignalHandler(handler, m_state); } else { var slot = Interlocked.Increment(ref m_handlersCount) - 1; if (slot < RESERVED_HANDLERS_COUNT) { if (slot == 0) { m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; } else { while (m_handlers == null) Thread.MemoryBarrier(); } m_handlers[slot] = handler; while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { } if (m_state > 1) { do { var hp = m_handlerPointer; slot = hp + 1; if (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) continue; SignalHandler(m_handlers[slot], m_state); } break; } while(true); } } else { if (slot == RESERVED_HANDLERS_COUNT) { m_extraHandlers = new SimpleAsyncQueue<THandler>(); } else { while (m_extraHandlers == null) Thread.MemoryBarrier(); } m_extraHandlers.Enqueue(handler); if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) SignalHandler(handler, m_state); } } } #endregion #region IPromise implementation public bool IsResolved { get { Thread.MemoryBarrier(); return m_state > 1; } } public bool IsCancelled { get { Thread.MemoryBarrier(); return m_state == CANCELLED_STATE; } } #endregion public Exception Error { get { return m_error; } } public bool CancelOperationIfRequested() { if (IsCancellationRequested) { CancelOperation(CancellationReason); return true; } return false; } public virtual void CancelOperation(Exception reason) { SetCancelled(reason); } public void CancellationRequested(Action<Exception> handler) { Safe.ArgumentNotNull(handler, "handler"); if (IsCancellationRequested) handler(CancellationReason); if (m_cancelationHandlers == null) Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null); m_cancelationHandlers.Enqueue(handler); if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) // TryDeque implies MemoryBarrier() handler(m_cancelationReason); } public bool IsCancellationRequested { get { do { if (m_cancelRequest == CANCEL_NOT_REQUESTED) return false; if (m_cancelRequest == CANCEL_REQUESTED) return true; Thread.MemoryBarrier(); } while(true); } } public Exception CancellationReason { get { do { Thread.MemoryBarrier(); } while(m_cancelRequest == CANCEL_REQUESTING); return m_cancelationReason; } } #region ICancellable implementation public void Cancel() { Cancel(null); } public void Cancel(Exception reason) { if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { m_cancelationReason = reason; m_cancelRequest = CANCEL_REQUESTED; if (m_cancelationHandlers != null) { Action<Exception> handler; while (m_cancelationHandlers.TryDequeue(out handler)) handler(m_cancelationReason); } } } #endregion } }