Mercurial > pub > ImplabNet
view Implab/AbstractPromise.cs @ 121:62d2f1e98c4e v2
working version of AsyncQueue and batch operations
tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 18:19:41 +0300 |
parents | 2573b562e328 |
children | f803565868a4 |
line wrap: on
line source
using System; using Implab.Parallels; using System.Threading; using System.Reflection; namespace Implab { public abstract class AbstractPromise<THandler> { const int UNRESOLVED_SATE = 0; const int TRANSITIONAL_STATE = 1; const int SUCCEEDED_STATE = 2; const int REJECTED_STATE = 3; const int CANCELLED_STATE = 4; int m_state; Exception m_error; readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>(); #region state managment bool BeginTransit() { return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); } void CompleteTransit(int state) { if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); } void WaitTransition() { while (m_state == TRANSITIONAL_STATE) { Thread.MemoryBarrier(); } } protected void BeginSetResult() { if (!BeginTransit()) { WaitTransition(); if (m_state != CANCELLED_STATE) throw new InvalidOperationException("The promise is already resolved"); } } protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); OnSuccess(); } /// <summary> /// Выполняет обещание, сообщая об ошибке /// </summary> /// <remarks> /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные /// будут проигнорированы. /// </remarks> /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> protected void SetError(Exception error) { if (BeginTransit()) { m_error = error is PromiseTransientException ? error.InnerException : error; CompleteTransit(REJECTED_STATE); OnError(); } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) throw new InvalidOperationException("The promise is already resolved"); } } /// <summary> /// Отменяет операцию, если это возможно. /// </summary> /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> protected void SetCancelled() { if (BeginTransit()) { CompleteTransit(CANCELLED_STATE); OnCancelled(); } } protected abstract void SignalSuccess(THandler handler); protected abstract void SignalError(THandler handler, Exception error); protected abstract void SignalCancelled(THandler handler); void OnSuccess() { THandler handler; while (m_handlers.TryDequeue(out handler)) SignalSuccess(handler); } void OnError() { THandler handler; while (m_handlers.TryDequeue(out handler)) SignalError(handler,m_error); } void OnCancelled() { THandler handler; while (m_handlers.TryDequeue(out handler)) SignalCancelled(handler); } #endregion protected abstract void Listen(PromiseEventType events, Action handler); #region synchronization traits protected void WaitResult(int timeout) { if (!IsResolved) { var lk = new object(); Listen(PromiseEventType.All, () => { lock(lk) { Monitor.Pulse(lk); } }); lock (lk) { while(!IsResolved) { if(!Monitor.Wait(lk,timeout)) throw new TimeoutException(); } } } switch (m_state) { case SUCCEEDED_STATE: return; case CANCELLED_STATE: throw new OperationCanceledException(); case REJECTED_STATE: throw new TargetInvocationException(m_error); default: throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); } } #endregion #region handlers managment protected void AddHandler(THandler handler) { if (IsResolved) { InvokeHandler(handler); } else { // the promise is in the resolved state, just invoke the handler m_handlers.Enqueue(handler); if (IsResolved && m_handlers.TryDequeue(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) InvokeHandler(handler); } } protected void InvokeHandler(THandler handler) { switch (m_state) { case SUCCEEDED_STATE: SignalSuccess(handler); break; case CANCELLED_STATE: SignalCancelled(handler); break; case REJECTED_STATE: SignalError(handler, m_error); break; default: throw new Exception(String.Format("Invalid promise state {0}", m_state)); } } #endregion #region IPromise implementation public void Join(int timeout) { WaitResult(timeout); } public void Join() { WaitResult(-1); } public bool IsResolved { get { Thread.MemoryBarrier(); return m_state > 1; } } public bool IsCancelled { get { Thread.MemoryBarrier(); return m_state == CANCELLED_STATE; } } #endregion #region ICancellable implementation public void Cancel() { SetCancelled(); } #endregion } }