view Implab/AbstractEvent.cs @ 242:cbe10ac0731e v3

Working on promises
author cin
date Wed, 24 Jan 2018 03:03:21 +0300
parents fa6cbf4d8841
children b1e0ffdf3451
line wrap: on
line source

using System;
using Implab.Parallels;
using System.Threading;
using System.Reflection;
using System.Diagnostics;

namespace Implab {
    public abstract class AbstractEvent<THandler> where THandler : class {

        const int PENDING_SATE = 0;
        protected const int TRANSITIONAL_STATE = 1;

        protected const int SUCCEEDED_STATE = 2;
        protected const int REJECTED_STATE = 3;

        volatile int m_state;
        Exception m_error;

        THandler m_handler;
        SimpleAsyncQueue<THandler> m_extraHandlers;

        #region state managment
        protected bool BeginTransit() {
            return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE);
        }

        protected void CompleteTransit(int state) {
#if DEBUG
            if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
#else
            m_state = state;
#endif
            Signal();
        }

        protected void WaitTransition() {
            if (m_state == TRANSITIONAL_STATE) {
                SpinWait spin;
                do {
                    spin.SpinOnce();
                } while (m_state == TRANSITIONAL_STATE);
            }
        }

        protected bool BeginSetResult() {
            if (!BeginTransit()) {
                WaitTransition();
                return false;
            }
            return true;
        }

        protected void EndSetResult() {
            CompleteTransit(SUCCEEDED_STATE);
        }



        /// <summary>
        /// Выполняет обещание, сообщая об ошибке
        /// </summary>
        /// <remarks>
        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
        /// будут проигнорированы.
        /// </remarks>
        /// <param name="error">Исключение возникшее при выполнении операции</param>
        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
        protected void SetError(Exception error) {
            if (BeginTransit()) {
                m_error = error;
                CompleteTransit(REJECTED_STATE);
            } else {
                WaitTransition();
                if (m_state == SUCCEEDED_STATE)
                    throw new InvalidOperationException("The promise is already resolved");
            }
        }

        protected abstract void SignalHandler(THandler handler, int signal);

        void Signal() {
            THandler handler;
            while (TryDequeueHandler(out handler))
                SignalHandler(handler, m_state);
        }

        #endregion

        protected abstract Signal GetFulfillSignal();

        #region synchronization traits
        protected void WaitResult(int timeout) {
            if (!(IsFulfilled || GetFulfillSignal().Wait(timeout)))
                throw new TimeoutException();

            if (IsRejected)
                Rethrow();
        }

        protected void Rethrow() {
            Debug.Assert(m_error != null);
            if (m_error is OperationCanceledException)
                throw new OperationCanceledException("Operation cancelled", m_error);
            else
                throw new TargetInvocationException(m_error);
        }
        #endregion

        #region handlers managment

        protected void AddHandler(THandler handler) {

            if (m_state > 1) {
                // the promise is in the resolved state, just invoke the handler
                SignalHandler(handler, m_state);
            } else {
                if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) {
                    if (m_extraHandlers == null)
                        // compare-exchange will fprotect from loosing already created queue
                        Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null);
                    m_extraHandlers.Enqueue(handler);
                }

                if (m_state > 1 && TryDequeueHandler(out handler))
                    // if the promise have been resolved while we was adding the handler to the queue
                    // we can't guarantee that someone is still processing it
                    // therefore we need to fetch a handler from the queue and execute it
                    // note that fetched handler may be not the one that we have added
                    // even we can fetch no handlers at all :)
                    SignalHandler(handler, m_state);
            }

        }

        bool TryDequeueHandler(out THandler handler) {
            handler = Interlocked.Exchange(ref m_handler, null);
            if (handler != null)
                return true;
            return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler);
        }

        #endregion

        #region IPromise implementation

        public bool IsFulfilled {
            get {
                return m_state > TRANSITIONAL_STATE;
            }
        }

        public bool IsRejected {
            get {
                return m_state == REJECTED_STATE;
            }
        }

        #endregion

        public Exception RejectReason {
            get {
                return m_error;
            }
        }

    }
}