Mercurial > pub > ImplabNet
diff Implab/AbstractEvent.cs @ 144:8c0b95069066 v2
DRAFT: refactoring
author | cin |
---|---|
date | Fri, 06 Mar 2015 15:45:26 +0300 |
parents | |
children | 706fccb85524 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/AbstractEvent.cs Fri Mar 06 15:45:26 2015 +0300 @@ -0,0 +1,350 @@ +using System; +using Implab.Parallels; +using System.Threading; +using System.Reflection; + +namespace Implab { + public abstract class AbstractEvent<THandler> : ICancelationToken, ICancellable { + + const int UNRESOLVED_SATE = 0; + const int TRANSITIONAL_STATE = 1; + const int SUCCEEDED_STATE = 2; + const int REJECTED_STATE = 3; + const int CANCELLED_STATE = 4; + + const int CANCEL_NOT_REQUESTED = 0; + const int CANCEL_REQUESTING = 1; + const int CANCEL_REQUESTED = 2; + + const int RESERVED_HANDLERS_COUNT = 4; + + int m_state; + Exception m_error; + int m_handlersCount; + + readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; + MTQueue<THandler> m_extraHandlers; + int m_handlerPointer = -1; + int m_handlersCommited; + + int m_cancelRequest; + Exception m_cancelationReason; + MTQueue<Action<Exception>> m_cancelationHandlers; + + + #region state managment + bool BeginTransit() { + return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + } + + void CompleteTransit(int state) { + if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) + throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); + } + + void WaitTransition() { + while (m_state == TRANSITIONAL_STATE) { + Thread.MemoryBarrier(); + } + } + + protected bool BeginSetResult() { + if (!BeginTransit()) { + WaitTransition(); + if (m_state != CANCELLED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + return false; + } + return true; + } + + protected void EndSetResult() { + CompleteTransit(SUCCEEDED_STATE); + OnSuccess(); + } + + + + /// <summary> + /// Выполняет обещание, сообщая об ошибке + /// </summary> + /// <remarks> + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// </remarks> + /// <param name="error">Исключение возникшее при выполнении операции</param> + /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> + protected void SetError(Exception error) { + if (BeginTransit()) { + if (error is OperationCanceledException) { + CompleteTransit(CANCELLED_STATE); + m_error = error.InnerException; + OnCancelled(); + } else { + m_error = error is PromiseTransientException ? error.InnerException : error; + CompleteTransit(REJECTED_STATE); + OnError(); + } + } else { + WaitTransition(); + if (m_state == SUCCEEDED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + /// <summary> + /// Отменяет операцию, если это возможно. + /// </summary> + /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> + protected void SetCancelled(Exception reason) { + if (BeginTransit()) { + m_error = reason; + CompleteTransit(CANCELLED_STATE); + OnCancelled(); + } + } + + protected abstract void SignalSuccess(THandler handler); + + protected abstract void SignalError(THandler handler, Exception error); + + protected abstract void SignalCancelled(THandler handler, Exception reason); + + void OnSuccess() { + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalSuccess(m_handlers[slot]); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalSuccess(handler); + } + } + + void OnError() { + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalError(m_handlers[slot],m_error); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalError(handler, m_error); + } + } + + void OnCancelled() { + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalCancelled(m_handlers[slot], m_error); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalCancelled(handler, m_error); + } + } + + #endregion + + protected abstract Signal GetResolveSignal(); + + #region synchronization traits + protected void WaitResult(int timeout) { + if (!IsResolved) + GetResolveSignal().Wait(timeout); + + switch (m_state) { + case SUCCEEDED_STATE: + return; + case CANCELLED_STATE: + throw new OperationCanceledException(); + case REJECTED_STATE: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); + } + } + #endregion + + #region handlers managment + + protected void AddHandler(THandler handler) { + + if (m_state > 1) { + // the promise is in the resolved state, just invoke the handler + InvokeHandler(handler); + } else { + var slot = Interlocked.Increment(ref m_handlersCount) - 1; + + if (slot < RESERVED_HANDLERS_COUNT) { + + m_handlers[slot] = handler; + + while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { + } + + if (m_state > 1) { + do { + var hp = m_handlerPointer; + slot = hp + 1; + if (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) + continue; + InvokeHandler(m_handlers[slot]); + } + break; + } while(true); + } + } else { + if (slot == RESERVED_HANDLERS_COUNT) { + m_extraHandlers = new MTQueue<THandler>(); + } else { + while (m_extraHandlers == null) + Thread.MemoryBarrier(); + } + + m_extraHandlers.Enqueue(handler); + + if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) + // if the promise have been resolved while we was adding the handler to the queue + // we can't guarantee that someone is still processing it + // therefore we need to fetch a handler from the queue and execute it + // note that fetched handler may be not the one that we have added + // even we can fetch no handlers at all :) + InvokeHandler(handler); + } + } + } + + protected void InvokeHandler(THandler handler) { + switch (m_state) { + case SUCCEEDED_STATE: + SignalSuccess(handler); + break; + case CANCELLED_STATE: + SignalCancelled(handler, m_error); + break; + case REJECTED_STATE: + SignalError(handler, m_error); + break; + default: + throw new Exception(String.Format("Invalid promise state {0}", m_state)); + } + } + + #endregion + + #region IPromise implementation + + public bool IsResolved { + get { + Thread.MemoryBarrier(); + return m_state > 1; + } + } + + public bool IsCancelled { + get { + Thread.MemoryBarrier(); + return m_state == CANCELLED_STATE; + } + } + + #endregion + + public Exception Error { + get { + return m_error; + } + } + + public bool AcceptIfRequested() { + if (IsCancelRequested) + CancelOperation(CancelReason); + } + + public virtual void CancelOperation(Exception reason) { + SetCancelled(reason); + } + + public void CancelationRequested(Action<Exception> handler) { + Safe.ArgumentNotNull(handler, "handler"); + if (IsCancelRequested) + handler(CancelReason); + + if (m_cancelationHandlers == null) + Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null); + + m_cancelationHandlers.Enqueue(handler); + + if (IsCancelRequested && m_cancelationHandlers.TryDequeue(out handler)) + // TryDeque implies MemoryBarrier() + handler(m_cancelationReason); + } + + public bool IsCancelRequested { + get { + do { + if (m_cancelRequest == CANCEL_NOT_REQUESTED) + return false; + if (m_cancelRequest == CANCEL_REQUESTED) + return true; + Thread.MemoryBarrier(); + } while(true); + } + } + + public Exception CancelReason { + get { + do { + Thread.MemoryBarrier(); + } while(m_cancelRequest == CANCEL_REQUESTING); + + return m_cancelationReason; + } + } + + #region ICancellable implementation + + public void Cancel() { + Cancel(null); + } + + public void Cancel(Exception reason) { + if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING)) { + m_cancelationReason = reason; + m_cancelRequest = CANCEL_REQUESTED; + if (m_cancelationHandlers != null) { + Action<Exception> handler; + while (m_cancelationHandlers.TryDequeue(out handler)) + handler(m_cancelationReason); + } + } + } + + #endregion + } +} +