view Implab/AbstractEvent.cs @ 240:fa6cbf4d8841 v3

refactoring, moving to dotnercore, simplifying promises
author cin
date Tue, 23 Jan 2018 19:39:21 +0300
parents d6fe09f5592c
children cbe10ac0731e
line wrap: on
line source

using System;
using Implab.Parallels;
using System.Threading;
using System.Reflection;

namespace Implab {
    public abstract class AbstractEvent<THandler> : ICancellable { 

        const int UNRESOLVED_SATE = 0;
        const int TRANSITIONAL_STATE = 1;
        protected const int SUCCEEDED_STATE = 2;
        protected const int REJECTED_STATE = 3;
        protected const int CANCELLED_STATE = 4;

        const int CANCEL_NOT_REQUESTED = 0;
        const int CANCEL_REQUESTING = 1;
        const int CANCEL_REQUESTED = 2;

        const int RESERVED_HANDLERS_COUNT = 4;

        int m_state;
        Exception m_error;
        int m_handlersCount;

        //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
        THandler[] m_handlers;
        SimpleAsyncQueue<THandler> m_extraHandlers;
        int m_handlerPointer = -1;
        int m_handlersCommited;

        int m_cancelRequest;
        Exception m_cancelationReason;

        #region state managment
        bool BeginTransit() {
            return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
        }

        void CompleteTransit(int state) {
            if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
        }

        void WaitTransition() {
            while (m_state == TRANSITIONAL_STATE) {
                Thread.MemoryBarrier();
            }
        }

        protected bool BeginSetResult() {
            if (!BeginTransit()) {
                WaitTransition();
                if (m_state != CANCELLED_STATE)
                    throw new InvalidOperationException("The promise is already resolved");
                return false;
            }
            return true;
        }

        protected void EndSetResult() {
            CompleteTransit(SUCCEEDED_STATE);
            Signal();
        }



        /// <summary>
        /// Выполняет обещание, сообщая об ошибке
        /// </summary>
        /// <remarks>
        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
        /// будут проигнорированы.
        /// </remarks>
        /// <param name="error">Исключение возникшее при выполнении операции</param>
        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
        protected void SetError(Exception error) {
            if (BeginTransit()) {
                m_error = error;
                CompleteTransit(REJECTED_STATE);

                Signal();
            } else {
                WaitTransition();
                if (m_state == SUCCEEDED_STATE)
                    throw new InvalidOperationException("The promise is already resolved");
            }
        }

        /// <summary>
        /// Отменяет операцию, если это возможно.
        /// </summary>
        /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
        protected void SetCancelled(Exception reason) {
            if (BeginTransit()) {
                m_error = reason;
                CompleteTransit(CANCELLED_STATE);
                Signal();
            }
        }

        protected abstract void SignalHandler(THandler handler, int signal);

        void Signal() {
            var hp = m_handlerPointer;
            var slot = hp +1 ;
            while (slot < m_handlersCommited) {
                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
                    SignalHandler(m_handlers[slot], m_state);
                }
                hp = m_handlerPointer;
                slot = hp +1 ;
            }


            if (m_extraHandlers != null) {
                THandler handler;
                while (m_extraHandlers.TryDequeue(out handler))
                    SignalHandler(handler, m_state);
            }
        }

        #endregion

        protected abstract Signal GetResolveSignal();

        #region synchronization traits
        protected void WaitResult(int timeout) {
            if (!(IsResolved || GetResolveSignal().Wait(timeout)))
                throw new TimeoutException();

            switch (m_state) {
                case SUCCEEDED_STATE:
                    return;
                case CANCELLED_STATE:
                    throw new OperationCanceledException("The operation has been cancelled", m_error);
                case REJECTED_STATE:
                    throw new TargetInvocationException(m_error);
                default:
                    throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
            }
        }
        #endregion

        #region handlers managment

        protected void AddHandler(THandler handler) {

            if (m_state > 1) {
                // the promise is in the resolved state, just invoke the handler
                SignalHandler(handler, m_state);
            } else {
                var slot = Interlocked.Increment(ref m_handlersCount) - 1;

                if (slot < RESERVED_HANDLERS_COUNT) {

                    if (slot == 0) {
                        m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
                    } else {
                        while (m_handlers == null)
                            Thread.MemoryBarrier();
                    }

                    m_handlers[slot] = handler;

                    while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
                    }

                    if (m_state > 1) {
                        do {
                            var hp = m_handlerPointer;
                            slot = hp + 1;
                            if (slot < m_handlersCommited) {
                                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
                                    continue;
                                SignalHandler(m_handlers[slot], m_state);
                            }
                            break;
                        } while(true);
                    }
                } else {
                    if (slot == RESERVED_HANDLERS_COUNT) {
                        m_extraHandlers = new SimpleAsyncQueue<THandler>();
                    } else {
                        while (m_extraHandlers == null)
                            Thread.MemoryBarrier();
                    }

                    m_extraHandlers.Enqueue(handler);

                    if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
                    // if the promise have been resolved while we was adding the handler to the queue
                    // we can't guarantee that someone is still processing it
                    // therefore we need to fetch a handler from the queue and execute it
                    // note that fetched handler may be not the one that we have added
                    // even we can fetch no handlers at all :)
                        SignalHandler(handler, m_state);
                }
            }
        }

        #endregion

        #region IPromise implementation

        public bool IsResolved {
            get {
                Thread.MemoryBarrier();
                return m_state > 1;
            }
        }

        public bool IsCancelled {
            get {
                Thread.MemoryBarrier();
                return m_state == CANCELLED_STATE;
            }
        }

        #endregion

        public Exception Error {
            get {
                return m_error;
            }
        }

        public bool CancelOperationIfRequested() {
            if (IsCancellationRequested) {
                CancelOperation(CancellationReason);
                return true;
            }
            return false;
        }

        public virtual void CancelOperation(Exception reason) {
            SetCancelled(reason);
        }

        public void CancellationRequested(Action<Exception> handler) {
            Safe.ArgumentNotNull(handler, "handler");
            if (IsCancellationRequested)
                handler(CancellationReason);

            if (m_cancelationHandlers == null)
                Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);

            m_cancelationHandlers.Enqueue(handler);

            if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
                // TryDeque implies MemoryBarrier()
                handler(m_cancelationReason);
        }

        public bool IsCancellationRequested {
            get {
                do {
                    if (m_cancelRequest == CANCEL_NOT_REQUESTED)
                        return false;
                    if (m_cancelRequest == CANCEL_REQUESTED)
                        return true;
                    Thread.MemoryBarrier();
                } while(true);
            }
        }

        public Exception CancellationReason {
            get {
                do {
                    Thread.MemoryBarrier();
                } while(m_cancelRequest == CANCEL_REQUESTING);

                return m_cancelationReason;
            }
        }

        #region ICancellable implementation

        public void Cancel() {
            Cancel(null);
        }

        public void Cancel(Exception reason) {
            if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
                m_cancelationReason = reason;
                m_cancelRequest = CANCEL_REQUESTED;
                if (m_cancelationHandlers != null) {
                    Action<Exception> handler;
                    while (m_cancelationHandlers.TryDequeue(out handler))
                        handler(m_cancelationReason);
                }
            }
        }

        #endregion
    }
}