Mercurial > pub > ImplabNet
diff Implab/AbstractPromise.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | |
children | f803565868a4 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/AbstractPromise.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,219 @@ +using System; +using Implab.Parallels; +using System.Threading; +using System.Reflection; + +namespace Implab { + public abstract class AbstractPromise<THandler> { + + const int UNRESOLVED_SATE = 0; + const int TRANSITIONAL_STATE = 1; + const int SUCCEEDED_STATE = 2; + const int REJECTED_STATE = 3; + const int CANCELLED_STATE = 4; + + int m_state; + Exception m_error; + + readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>(); + + #region state managment + bool BeginTransit() { + return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + } + + void CompleteTransit(int state) { + if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) + throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); + } + + void WaitTransition() { + while (m_state == TRANSITIONAL_STATE) { + Thread.MemoryBarrier(); + } + } + + protected void BeginSetResult() { + if (!BeginTransit()) { + WaitTransition(); + if (m_state != CANCELLED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + protected void EndSetResult() { + CompleteTransit(SUCCEEDED_STATE); + OnSuccess(); + } + + + + /// <summary> + /// Выполняет обещание, сообщая об ошибке + /// </summary> + /// <remarks> + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// </remarks> + /// <param name="error">Исключение возникшее при выполнении операции</param> + /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> + protected void SetError(Exception error) { + if (BeginTransit()) { + m_error = error is PromiseTransientException ? error.InnerException : error; + CompleteTransit(REJECTED_STATE); + OnError(); + } else { + WaitTransition(); + if (m_state == SUCCEEDED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + /// <summary> + /// Отменяет операцию, если это возможно. + /// </summary> + /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> + protected void SetCancelled() { + if (BeginTransit()) { + CompleteTransit(CANCELLED_STATE); + OnCancelled(); + } + } + + protected abstract void SignalSuccess(THandler handler); + + protected abstract void SignalError(THandler handler, Exception error); + + protected abstract void SignalCancelled(THandler handler); + + void OnSuccess() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalSuccess(handler); + } + + void OnError() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalError(handler,m_error); + } + + void OnCancelled() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalCancelled(handler); + } + + #endregion + + protected abstract void Listen(PromiseEventType events, Action handler); + + #region synchronization traits + protected void WaitResult(int timeout) { + if (!IsResolved) { + var lk = new object(); + + Listen(PromiseEventType.All, () => { + lock(lk) { + Monitor.Pulse(lk); + } + }); + + lock (lk) { + while(!IsResolved) { + if(!Monitor.Wait(lk,timeout)) + throw new TimeoutException(); + } + } + + } + switch (m_state) { + case SUCCEEDED_STATE: + return; + case CANCELLED_STATE: + throw new OperationCanceledException(); + case REJECTED_STATE: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); + } + } + #endregion + + #region handlers managment + + protected void AddHandler(THandler handler) { + + if (IsResolved) { + InvokeHandler(handler); + + } else { + // the promise is in the resolved state, just invoke the handler + m_handlers.Enqueue(handler); + + + if (IsResolved && m_handlers.TryDequeue(out handler)) + // if the promise have been resolved while we was adding the handler to the queue + // we can't guarantee that someone is still processing it + // therefore we need to fetch a handler from the queue and execute it + // note that fetched handler may be not the one that we have added + // even we can fetch no handlers at all :) + InvokeHandler(handler); + } + } + + protected void InvokeHandler(THandler handler) { + switch (m_state) { + case SUCCEEDED_STATE: + SignalSuccess(handler); + break; + case CANCELLED_STATE: + SignalCancelled(handler); + break; + case REJECTED_STATE: + SignalError(handler, m_error); + break; + default: + throw new Exception(String.Format("Invalid promise state {0}", m_state)); + } + } + + #endregion + + #region IPromise implementation + + public void Join(int timeout) { + WaitResult(timeout); + } + + public void Join() { + WaitResult(-1); + } + + public bool IsResolved { + get { + Thread.MemoryBarrier(); + return m_state > 1; + } + } + + public bool IsCancelled { + get { + Thread.MemoryBarrier(); + return m_state == CANCELLED_STATE; + } + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + SetCancelled(); + } + + #endregion + } +} +