Mercurial > pub > ImplabNet
view Implab/AbstractPromise.cs @ 125:f803565868a4 v2
improved performance of promises
author | cin |
---|---|
date | Thu, 15 Jan 2015 12:09:20 +0300 |
parents | 2573b562e328 |
children | 671f60cd0250 |
line wrap: on
line source
using System; using Implab.Parallels; using System.Threading; using System.Reflection; namespace Implab { public abstract class AbstractPromise<THandler> { const int UNRESOLVED_SATE = 0; const int TRANSITIONAL_STATE = 1; const int SUCCEEDED_STATE = 2; const int REJECTED_STATE = 3; const int CANCELLED_STATE = 4; const int RESERVED_HANDLERS_COUNT = 4; int m_state; Exception m_error; int m_handlersCount; readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; MTQueue<THandler> m_extraHandlers; int m_handlerPointer = -1; int m_handlersCommited; #region state managment bool BeginTransit() { return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); } void CompleteTransit(int state) { if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); } void WaitTransition() { while (m_state == TRANSITIONAL_STATE) { Thread.MemoryBarrier(); } } protected void BeginSetResult() { if (!BeginTransit()) { WaitTransition(); if (m_state != CANCELLED_STATE) throw new InvalidOperationException("The promise is already resolved"); } } protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); OnSuccess(); } /// <summary> /// Выполняет обещание, сообщая об ошибке /// </summary> /// <remarks> /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные /// будут проигнорированы. /// </remarks> /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> protected void SetError(Exception error) { if (BeginTransit()) { m_error = error is PromiseTransientException ? error.InnerException : error; CompleteTransit(REJECTED_STATE); OnError(); } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) throw new InvalidOperationException("The promise is already resolved"); } } /// <summary> /// Отменяет операцию, если это возможно. /// </summary> /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> protected void SetCancelled() { if (BeginTransit()) { CompleteTransit(CANCELLED_STATE); OnCancelled(); } } protected abstract void SignalSuccess(THandler handler); protected abstract void SignalError(THandler handler, Exception error); protected abstract void SignalCancelled(THandler handler); void OnSuccess() { var hp = m_handlerPointer; var slot = hp +1 ; while (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { SignalSuccess(m_handlers[slot]); } hp = m_handlerPointer; slot = hp +1 ; } if (m_extraHandlers != null) { THandler handler; while (m_extraHandlers.TryDequeue(out handler)) SignalSuccess(handler); } } void OnError() { var hp = m_handlerPointer; var slot = hp +1 ; while (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { SignalError(m_handlers[slot],m_error); } hp = m_handlerPointer; slot = hp +1 ; } if (m_extraHandlers != null) { THandler handler; while (m_extraHandlers.TryDequeue(out handler)) SignalError(handler, m_error); } } void OnCancelled() { var hp = m_handlerPointer; var slot = hp +1 ; while (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { SignalCancelled(m_handlers[slot]); } hp = m_handlerPointer; slot = hp +1 ; } if (m_extraHandlers != null) { THandler handler; while (m_extraHandlers.TryDequeue(out handler)) SignalCancelled(handler); } } #endregion protected abstract void Listen(PromiseEventType events, Action handler); #region synchronization traits protected void WaitResult(int timeout) { if (!IsResolved) { var lk = new object(); Listen(PromiseEventType.All, () => { lock(lk) { Monitor.Pulse(lk); } }); lock (lk) { while(!IsResolved) { if(!Monitor.Wait(lk,timeout)) throw new TimeoutException(); } } } switch (m_state) { case SUCCEEDED_STATE: return; case CANCELLED_STATE: throw new OperationCanceledException(); case REJECTED_STATE: throw new TargetInvocationException(m_error); default: throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); } } #endregion #region handlers managment protected void AddHandler(THandler handler) { if (m_state > 1) { // the promise is in the resolved state, just invoke the handler InvokeHandler(handler); } else { var slot = Interlocked.Increment(ref m_handlersCount) - 1; if (slot < RESERVED_HANDLERS_COUNT) { m_handlers[slot] = handler; while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { } if (m_state > 1) { do { var hp = m_handlerPointer; slot = hp + 1; if (slot < m_handlersCommited) { if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) continue; InvokeHandler(m_handlers[slot]); } break; } while(true); } } else { if (slot == RESERVED_HANDLERS_COUNT) { m_extraHandlers = new MTQueue<THandler>(); } else { while (m_extraHandlers == null) Thread.MemoryBarrier(); } m_extraHandlers.Enqueue(handler); if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) InvokeHandler(handler); } } } protected void InvokeHandler(THandler handler) { switch (m_state) { case SUCCEEDED_STATE: SignalSuccess(handler); break; case CANCELLED_STATE: SignalCancelled(handler); break; case REJECTED_STATE: SignalError(handler, m_error); break; default: throw new Exception(String.Format("Invalid promise state {0}", m_state)); } } #endregion #region IPromise implementation public void Join(int timeout) { WaitResult(timeout); } public void Join() { WaitResult(-1); } public bool IsResolved { get { Thread.MemoryBarrier(); return m_state > 1; } } public bool IsCancelled { get { Thread.MemoryBarrier(); return m_state == CANCELLED_STATE; } } #endregion #region ICancellable implementation public void Cancel() { SetCancelled(); } #endregion } }