view Implab/AbstractEvent.cs @ 209:a867536c68fc v2

Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise
author cin
date Wed, 16 Nov 2016 03:06:08 +0300
parents 75103928da09
children d6fe09f5592c
line wrap: on
line source

using System;
using Implab.Parallels;
using System.Threading;
using System.Reflection;

namespace Implab {
    public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { 

        const int UNRESOLVED_SATE = 0;
        const int TRANSITIONAL_STATE = 1;
        protected const int SUCCEEDED_STATE = 2;
        protected const int REJECTED_STATE = 3;
        protected const int CANCELLED_STATE = 4;

        const int CANCEL_NOT_REQUESTED = 0;
        const int CANCEL_REQUESTING = 1;
        const int CANCEL_REQUESTED = 2;

        const int RESERVED_HANDLERS_COUNT = 4;

        int m_state;
        Exception m_error;
        int m_handlersCount;

        //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
        THandler[] m_handlers;
        MTQueue<THandler> m_extraHandlers;
        int m_handlerPointer = -1;
        int m_handlersCommited;

        int m_cancelRequest;
        Exception m_cancelationReason;
        MTQueue<Action<Exception>> m_cancelationHandlers;


        #region state managment
        bool BeginTransit() {
            return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
        }

        void CompleteTransit(int state) {
            if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
        }

        void WaitTransition() {
            while (m_state == TRANSITIONAL_STATE) {
                Thread.MemoryBarrier();
            }
        }

        protected bool BeginSetResult() {
            if (!BeginTransit()) {
                WaitTransition();
                if (m_state != CANCELLED_STATE)
                    throw new InvalidOperationException("The promise is already resolved");
                return false;
            }
            return true;
        }

        protected void EndSetResult() {
            CompleteTransit(SUCCEEDED_STATE);
            Signal();
        }



        /// <summary>
        /// Выполняет обещание, сообщая об ошибке
        /// </summary>
        /// <remarks>
        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
        /// будут проигнорированы.
        /// </remarks>
        /// <param name="error">Исключение возникшее при выполнении операции</param>
        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
        protected void SetError(Exception error) {
            if (BeginTransit()) {
                m_error = error;
                CompleteTransit(REJECTED_STATE);

                Signal();
            } else {
                WaitTransition();
                if (m_state == SUCCEEDED_STATE)
                    throw new InvalidOperationException("The promise is already resolved");
            }
        }

        /// <summary>
        /// Отменяет операцию, если это возможно.
        /// </summary>
        /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
        protected void SetCancelled(Exception reason) {
            if (BeginTransit()) {
                m_error = reason;
                CompleteTransit(CANCELLED_STATE);
                Signal();
            }
        }

        protected abstract void SignalHandler(THandler handler, int signal);

        void Signal() {
            var hp = m_handlerPointer;
            var slot = hp +1 ;
            while (slot < m_handlersCommited) {
                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
                    SignalHandler(m_handlers[slot], m_state);
                }
                hp = m_handlerPointer;
                slot = hp +1 ;
            }


            if (m_extraHandlers != null) {
                THandler handler;
                while (m_extraHandlers.TryDequeue(out handler))
                    SignalHandler(handler, m_state);
            }
        }

        #endregion

        protected abstract Signal GetResolveSignal();

        #region synchronization traits
        protected void WaitResult(int timeout) {
            if (!(IsResolved || GetResolveSignal().Wait(timeout)))
                throw new TimeoutException();

            switch (m_state) {
                case SUCCEEDED_STATE:
                    return;
                case CANCELLED_STATE:
                    throw new OperationCanceledException("The operation has been cancelled", m_error);
                case REJECTED_STATE:
                    throw new TargetInvocationException(m_error);
                default:
                    throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
            }
        }
        #endregion

        #region handlers managment

        protected void AddHandler(THandler handler) {

            if (m_state > 1) {
                // the promise is in the resolved state, just invoke the handler
                SignalHandler(handler, m_state);
            } else {
                var slot = Interlocked.Increment(ref m_handlersCount) - 1;

                if (slot < RESERVED_HANDLERS_COUNT) {

                    if (slot == 0) {
                        m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
                    } else {
                        while (m_handlers == null)
                            Thread.MemoryBarrier();
                    }

                    m_handlers[slot] = handler;

                    while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
                    }

                    if (m_state > 1) {
                        do {
                            var hp = m_handlerPointer;
                            slot = hp + 1;
                            if (slot < m_handlersCommited) {
                                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
                                    continue;
                                SignalHandler(m_handlers[slot], m_state);
                            }
                            break;
                        } while(true);
                    }
                } else {
                    if (slot == RESERVED_HANDLERS_COUNT) {
                        m_extraHandlers = new MTQueue<THandler>();
                    } else {
                        while (m_extraHandlers == null)
                            Thread.MemoryBarrier();
                    }

                    m_extraHandlers.Enqueue(handler);

                    if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
                    // if the promise have been resolved while we was adding the handler to the queue
                    // we can't guarantee that someone is still processing it
                    // therefore we need to fetch a handler from the queue and execute it
                    // note that fetched handler may be not the one that we have added
                    // even we can fetch no handlers at all :)
                        SignalHandler(handler, m_state);
                }
            }
        }

        #endregion

        #region IPromise implementation

        public bool IsResolved {
            get {
                Thread.MemoryBarrier();
                return m_state > 1;
            }
        }

        public bool IsCancelled {
            get {
                Thread.MemoryBarrier();
                return m_state == CANCELLED_STATE;
            }
        }

        #endregion

        public Exception Error {
            get {
                return m_error;
            }
        }

        public bool CancelOperationIfRequested() {
            if (IsCancellationRequested) {
                CancelOperation(CancellationReason);
                return true;
            }
            return false;
        }

        public virtual void CancelOperation(Exception reason) {
            SetCancelled(reason);
        }

        public void CancellationRequested(Action<Exception> handler) {
            Safe.ArgumentNotNull(handler, "handler");
            if (IsCancellationRequested)
                handler(CancellationReason);

            if (m_cancelationHandlers == null)
                Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);

            m_cancelationHandlers.Enqueue(handler);

            if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
                // TryDeque implies MemoryBarrier()
                handler(m_cancelationReason);
        }

        public bool IsCancellationRequested {
            get {
                do {
                    if (m_cancelRequest == CANCEL_NOT_REQUESTED)
                        return false;
                    if (m_cancelRequest == CANCEL_REQUESTED)
                        return true;
                    Thread.MemoryBarrier();
                } while(true);
            }
        }

        public Exception CancellationReason {
            get {
                do {
                    Thread.MemoryBarrier();
                } while(m_cancelRequest == CANCEL_REQUESTING);

                return m_cancelationReason;
            }
        }

        #region ICancellable implementation

        public void Cancel() {
            Cancel(null);
        }

        public void Cancel(Exception reason) {
            if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
                m_cancelationReason = reason;
                m_cancelRequest = CANCEL_REQUESTED;
                if (m_cancelationHandlers != null) {
                    Action<Exception> handler;
                    while (m_cancelationHandlers.TryDequeue(out handler))
                        handler(m_cancelationReason);
                }
            }
        }

        #endregion
    }
}