Mercurial > pub > ImplabNet
diff Implab/AbstractPromise.cs @ 144:8c0b95069066 v2
DRAFT: refactoring
author | cin |
---|---|
date | Fri, 06 Mar 2015 15:45:26 +0300 |
parents | 16f926ee499d |
children | 706fccb85524 |
line wrap: on
line diff
--- a/Implab/AbstractPromise.cs Wed Mar 04 18:05:39 2015 +0300 +++ b/Implab/AbstractPromise.cs Fri Mar 06 15:45:26 2015 +0300 @@ -1,295 +1,135 @@ using System; using Implab.Parallels; -using System.Threading; -using System.Reflection; namespace Implab { - public abstract class AbstractPromise<THandler> { + public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise { + public struct HandlerDescriptor { + readonly Action m_handler; + readonly Action<Exception> m_error; + readonly Action<Exception> m_cancel; + readonly PromiseEventType m_mask; - const int UNRESOLVED_SATE = 0; - const int TRANSITIONAL_STATE = 1; - const int SUCCEEDED_STATE = 2; - const int REJECTED_STATE = 3; - const int CANCELLED_STATE = 4; - - const int RESERVED_HANDLERS_COUNT = 4; + public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) { + m_handler = success; + m_error = error; + m_cancel = cancel; + m_mask = PromiseEventType.Success; + } - int m_state; - Exception m_error; - int m_handlersCount; + public HandlerDescriptor(Action handler, PromiseEventType mask) { + m_handler = handler; + m_mask = mask; + } - readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - MTQueue<THandler> m_extraHandlers; - int m_handlerPointer = -1; - int m_handlersCommited; - - #region state managment - bool BeginTransit() { - return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); - } + public void SignalSuccess() { + if (m_mask & PromiseEventType.Success && m_handler != null) { + try { + m_handler(); + } catch (Exception err) { + // avoid calling handler twice in case of error + if (m_error != null) + SignalError(err); + } + } + } - void CompleteTransit(int state) { - if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) - throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); - } - - void WaitTransition() { - while (m_state == TRANSITIONAL_STATE) { - Thread.MemoryBarrier(); + public void SignalError(Exception err) { + if (m_error != null) { + try { + m_error(err); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } else if (m_mask & PromiseEventType.Error && m_handler != null) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } } - } - protected bool BeginSetResult() { - if (!BeginTransit()) { - WaitTransition(); - if (m_state != CANCELLED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - return false; + public void SignalCancel(Exception reason) { + if (m_cancel != null) { + try { + m_cancel(reason); + } catch (Exception err) { + SignalError(err); + } + } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } } - return true; - } - - protected void EndSetResult() { - CompleteTransit(SUCCEEDED_STATE); - OnSuccess(); } + #region implemented abstract members of AbstractPromise - /// <summary> - /// Выполняет обещание, сообщая об ошибке - /// </summary> - /// <remarks> - /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков - /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные - /// будут проигнорированы. - /// </remarks> - /// <param name="error">Исключение возникшее при выполнении операции</param> - /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> - protected void SetError(Exception error) { - if (BeginTransit()) { - if (error is OperationCanceledException) { - CompleteTransit(CANCELLED_STATE); - m_error = error.InnerException; - OnCancelled(); - } else { - m_error = error is PromiseTransientException ? error.InnerException : error; - CompleteTransit(REJECTED_STATE); - OnError(); - } - } else { - WaitTransition(); - if (m_state == SUCCEEDED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - } - } - - /// <summary> - /// Отменяет операцию, если это возможно. - /// </summary> - /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> - protected void SetCancelled(Exception reason) { - if (BeginTransit()) { - m_error = reason; - CompleteTransit(CANCELLED_STATE); - OnCancelled(); - } + protected override void SignalSuccess(HandlerDescriptor handler) { + handler.SignalSuccess(); } - protected abstract void SignalSuccess(THandler handler); - - protected abstract void SignalError(THandler handler, Exception error); - - protected abstract void SignalCancelled(THandler handler, Exception reason); - - void OnSuccess() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalSuccess(m_handlers[slot]); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalSuccess(handler); - } + protected override void SignalError(HandlerDescriptor handler, Exception error) { + handler.SignalError(error); } - void OnError() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalError(m_handlers[slot],m_error); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalError(handler, m_error); - } + protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) { + handler.SignalCancel(reason); } - void OnCancelled() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalCancelled(m_handlers[slot], m_error); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalCancelled(handler, m_error); - } + protected override Signal GetResolveSignal() { + var signal = new Signal(); + On(signal.Set, PromiseEventType.All); } #endregion - protected abstract void Listen(PromiseEventType events, Action handler); - #region synchronization traits - protected void WaitResult(int timeout) { - if (!IsResolved) { - var lk = new object(); - - Listen(PromiseEventType.All, () => { - lock(lk) { - Monitor.Pulse(lk); - } - }); - - lock (lk) { - while(!IsResolved) { - if(!Monitor.Wait(lk,timeout)) - throw new TimeoutException(); - } - } - - } - switch (m_state) { - case SUCCEEDED_STATE: - return; - case CANCELLED_STATE: - throw new OperationCanceledException(); - case REJECTED_STATE: - throw new TargetInvocationException(m_error); - default: - throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); - } - } - #endregion - - #region handlers managment - - protected void AddHandler(THandler handler) { - - if (m_state > 1) { - // the promise is in the resolved state, just invoke the handler - InvokeHandler(handler); - } else { - var slot = Interlocked.Increment(ref m_handlersCount) - 1; - - if (slot < RESERVED_HANDLERS_COUNT) { - m_handlers[slot] = handler; - - while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { - } - - if (m_state > 1) { - do { - var hp = m_handlerPointer; - slot = hp + 1; - if (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) - continue; - InvokeHandler(m_handlers[slot]); - } - break; - } while(true); - } - } else { - if (slot == RESERVED_HANDLERS_COUNT) { - m_extraHandlers = new MTQueue<THandler>(); - } else { - while (m_extraHandlers == null) - Thread.MemoryBarrier(); - } - - m_extraHandlers.Enqueue(handler); - - if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) - // if the promise have been resolved while we was adding the handler to the queue - // we can't guarantee that someone is still processing it - // therefore we need to fetch a handler from the queue and execute it - // note that fetched handler may be not the one that we have added - // even we can fetch no handlers at all :) - InvokeHandler(handler); - } + public Type PromiseType { + get { + return typeof(void); } } - protected void InvokeHandler(THandler handler) { - switch (m_state) { - case SUCCEEDED_STATE: - SignalSuccess(handler); - break; - case CANCELLED_STATE: - SignalCancelled(handler, m_error); - break; - case REJECTED_STATE: - SignalError(handler, m_error); - break; - default: - throw new Exception(String.Format("Invalid promise state {0}", m_state)); - } + public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) { + AddHandler(new HandlerDescriptor(success, error, cancel)); + return this; + } + + public IPromise On(Action success, Action<Exception> error) { + AddHandler(new HandlerDescriptor(success, error, null)); + return this; } - #endregion + public IPromise On(Action success) { + AddHandler(new HandlerDescriptor(success, null, null)); + return this; + } - #region IPromise implementation + public IPromise On(Action handler, PromiseEventType events) { + AddHandler(new HandlerDescriptor(handler,events)); + return this; + } - public void Join(int timeout) { - WaitResult(timeout); + public IPromise<T> Cast<T>() { + throw new InvalidCastException(); } public void Join() { WaitResult(-1); } - public bool IsResolved { - get { - Thread.MemoryBarrier(); - return m_state > 1; - } + public void Join(int timeout) { + WaitResult(timeout); } - public bool IsCancelled { - get { - Thread.MemoryBarrier(); - return m_state == CANCELLED_STATE; - } - } - - #endregion - - public Exception Error { - get { - return m_error; - } + protected void SetResult() { + BeginSetResult(); + EndSetResult(); } } }