view Implab/AbstractPromise.cs @ 127:d86da8d2d4c3 v2

fixed AsyncQueue iterator
author cin
date Tue, 27 Jan 2015 18:18:29 +0300
parents f803565868a4
children 671f60cd0250
line wrap: on
line source

using System;
using Implab.Parallels;
using System.Threading;
using System.Reflection;

namespace Implab {
    public abstract class AbstractPromise<THandler> { 

        const int UNRESOLVED_SATE = 0;
        const int TRANSITIONAL_STATE = 1;
        const int SUCCEEDED_STATE = 2;
        const int REJECTED_STATE = 3;
        const int CANCELLED_STATE = 4;

        const int RESERVED_HANDLERS_COUNT = 4;

        int m_state;
        Exception m_error;
        int m_handlersCount;

        readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
        MTQueue<THandler> m_extraHandlers;
        int m_handlerPointer = -1;
        int m_handlersCommited;

        #region state managment
        bool BeginTransit() {
            return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
        }

        void CompleteTransit(int state) {
            if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
        }

        void WaitTransition() {
            while (m_state == TRANSITIONAL_STATE) {
                Thread.MemoryBarrier();
            }
        }

        protected void BeginSetResult() {
            if (!BeginTransit()) {
                WaitTransition();
                if (m_state != CANCELLED_STATE)
                    throw new InvalidOperationException("The promise is already resolved");
            }
        }

        protected void EndSetResult() {
            CompleteTransit(SUCCEEDED_STATE);
            OnSuccess();
        }



        /// <summary>
        /// Выполняет обещание, сообщая об ошибке
        /// </summary>
        /// <remarks>
        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
        /// будут проигнорированы.
        /// </remarks>
        /// <param name="error">Исключение возникшее при выполнении операции</param>
        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
        protected void SetError(Exception error) {
            if (BeginTransit()) {
                m_error = error is PromiseTransientException ? error.InnerException : error;
                CompleteTransit(REJECTED_STATE);
                OnError();
            } else {
                WaitTransition();
                if (m_state == SUCCEEDED_STATE)
                    throw new InvalidOperationException("The promise is already resolved");
            }
        }

        /// <summary>
        /// Отменяет операцию, если это возможно.
        /// </summary>
        /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
        protected void SetCancelled() {
            if (BeginTransit()) {
                CompleteTransit(CANCELLED_STATE);
                OnCancelled();
            }
        }

        protected abstract void SignalSuccess(THandler handler);

        protected abstract void SignalError(THandler handler, Exception error);

        protected abstract void SignalCancelled(THandler handler);

        void OnSuccess() {
            var hp = m_handlerPointer;
            var slot = hp +1 ;
            while (slot < m_handlersCommited) {
                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
                    SignalSuccess(m_handlers[slot]);
                }
                hp = m_handlerPointer;
                slot = hp +1 ;
            }


            if (m_extraHandlers != null) {
                THandler handler;
                while (m_extraHandlers.TryDequeue(out handler))
                    SignalSuccess(handler);
            }
        }

        void OnError() {
            var hp = m_handlerPointer;
            var slot = hp +1 ;
            while (slot < m_handlersCommited) {
                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
                    SignalError(m_handlers[slot],m_error);
                }
                hp = m_handlerPointer;
                slot = hp +1 ;
            }

            if (m_extraHandlers != null) {
                THandler handler;
                while (m_extraHandlers.TryDequeue(out handler))
                    SignalError(handler, m_error);
            }
        }

        void OnCancelled() {
            var hp = m_handlerPointer;
            var slot = hp +1 ;
            while (slot < m_handlersCommited) {
                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
                    SignalCancelled(m_handlers[slot]);
                }
                hp = m_handlerPointer;
                slot = hp +1 ;
            }

            if (m_extraHandlers != null) {
                THandler handler;
                while (m_extraHandlers.TryDequeue(out handler))
                    SignalCancelled(handler);
            }
        }

        #endregion

        protected abstract void Listen(PromiseEventType events, Action handler);

        #region synchronization traits
        protected void WaitResult(int timeout) {
            if (!IsResolved) {
                var lk = new object();

                Listen(PromiseEventType.All, () => {
                    lock(lk) {
                        Monitor.Pulse(lk);
                    }
                });

                lock (lk) {
                    while(!IsResolved) {
                        if(!Monitor.Wait(lk,timeout))
                            throw new TimeoutException();
                    }
                }

            }
            switch (m_state) {
                case SUCCEEDED_STATE:
                    return;
                case CANCELLED_STATE:
                    throw new OperationCanceledException();
                case REJECTED_STATE:
                    throw new TargetInvocationException(m_error);
                default:
                    throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
            }
        }
        #endregion

        #region handlers managment

        protected void AddHandler(THandler handler) {

            if (m_state > 1) {
                // the promise is in the resolved state, just invoke the handler
                InvokeHandler(handler);
            } else {
                var slot = Interlocked.Increment(ref m_handlersCount) - 1;

                if (slot < RESERVED_HANDLERS_COUNT) {
                    m_handlers[slot] = handler;

                    while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
                    }

                    if (m_state > 1) {
                        do {
                            var hp = m_handlerPointer;
                            slot = hp + 1;
                            if (slot < m_handlersCommited) {
                                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
                                    continue;
                                InvokeHandler(m_handlers[slot]);
                            }
                            break;
                        } while(true);
                    }
                } else {
                    if (slot == RESERVED_HANDLERS_COUNT) {
                        m_extraHandlers = new MTQueue<THandler>();
                    } else {
                        while (m_extraHandlers == null)
                            Thread.MemoryBarrier();
                    }

                    m_extraHandlers.Enqueue(handler);

                    if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
                    // if the promise have been resolved while we was adding the handler to the queue
                    // we can't guarantee that someone is still processing it
                    // therefore we need to fetch a handler from the queue and execute it
                    // note that fetched handler may be not the one that we have added
                    // even we can fetch no handlers at all :)
                    InvokeHandler(handler);
                }
            }
        }

        protected void InvokeHandler(THandler handler) {
            switch (m_state) {
                case SUCCEEDED_STATE:
                    SignalSuccess(handler);
                    break;
                case CANCELLED_STATE:
                    SignalCancelled(handler);
                    break;
                case REJECTED_STATE:
                    SignalError(handler, m_error);
                    break;
                default:
                    throw new Exception(String.Format("Invalid promise state {0}", m_state));
            }
        }

        #endregion

        #region IPromise implementation

        public void Join(int timeout) {
            WaitResult(timeout);
        }

        public void Join() {
            WaitResult(-1);
        }

        public bool IsResolved {
            get {
                Thread.MemoryBarrier();
                return m_state > 1;
            }
        }

        public bool IsCancelled {
            get {
                Thread.MemoryBarrier();
                return m_state == CANCELLED_STATE;
            }
        }

        #endregion

        #region ICancellable implementation

        public void Cancel() {
            SetCancelled();
        }

        #endregion
    }
}