diff Implab/AbstractEvent.cs @ 144:8c0b95069066 v2

DRAFT: refactoring
author cin
date Fri, 06 Mar 2015 15:45:26 +0300
parents
children 706fccb85524
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/AbstractEvent.cs	Fri Mar 06 15:45:26 2015 +0300
@@ -0,0 +1,350 @@
+using System;
+using Implab.Parallels;
+using System.Threading;
+using System.Reflection;
+
+namespace Implab {
+    public abstract class AbstractEvent<THandler> : ICancelationToken, ICancellable { 
+
+        const int UNRESOLVED_SATE = 0;
+        const int TRANSITIONAL_STATE = 1;
+        const int SUCCEEDED_STATE = 2;
+        const int REJECTED_STATE = 3;
+        const int CANCELLED_STATE = 4;
+
+        const int CANCEL_NOT_REQUESTED = 0;
+        const int CANCEL_REQUESTING = 1;
+        const int CANCEL_REQUESTED = 2;
+
+        const int RESERVED_HANDLERS_COUNT = 4;
+
+        int m_state;
+        Exception m_error;
+        int m_handlersCount;
+
+        readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
+        MTQueue<THandler> m_extraHandlers;
+        int m_handlerPointer = -1;
+        int m_handlersCommited;
+
+        int m_cancelRequest;
+        Exception m_cancelationReason;
+        MTQueue<Action<Exception>> m_cancelationHandlers;
+
+
+        #region state managment
+        bool BeginTransit() {
+            return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
+        }
+
+        void CompleteTransit(int state) {
+            if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
+                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
+        }
+
+        void WaitTransition() {
+            while (m_state == TRANSITIONAL_STATE) {
+                Thread.MemoryBarrier();
+            }
+        }
+
+        protected bool BeginSetResult() {
+            if (!BeginTransit()) {
+                WaitTransition();
+                if (m_state != CANCELLED_STATE)
+                    throw new InvalidOperationException("The promise is already resolved");
+                return false;
+            }
+            return true;
+        }
+
+        protected void EndSetResult() {
+            CompleteTransit(SUCCEEDED_STATE);
+            OnSuccess();
+        }
+
+
+
+        /// <summary>
+        /// Выполняет обещание, сообщая об ошибке
+        /// </summary>
+        /// <remarks>
+        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
+        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
+        /// будут проигнорированы.
+        /// </remarks>
+        /// <param name="error">Исключение возникшее при выполнении операции</param>
+        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
+        protected void SetError(Exception error) {
+            if (BeginTransit()) {
+                if (error is OperationCanceledException) {
+                    CompleteTransit(CANCELLED_STATE);
+                    m_error = error.InnerException;
+                    OnCancelled();
+                } else {
+                    m_error = error is PromiseTransientException ? error.InnerException : error;
+                    CompleteTransit(REJECTED_STATE);
+                    OnError();
+                }
+            } else {
+                WaitTransition();
+                if (m_state == SUCCEEDED_STATE)
+                    throw new InvalidOperationException("The promise is already resolved");
+            }
+        }
+
+        /// <summary>
+        /// Отменяет операцию, если это возможно.
+        /// </summary>
+        /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
+        protected void SetCancelled(Exception reason) {
+            if (BeginTransit()) {
+                m_error = reason;
+                CompleteTransit(CANCELLED_STATE);
+                OnCancelled();
+            }
+        }
+
+        protected abstract void SignalSuccess(THandler handler);
+
+        protected abstract void SignalError(THandler handler, Exception error);
+
+        protected abstract void SignalCancelled(THandler handler, Exception reason);
+
+        void OnSuccess() {
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalSuccess(m_handlers[slot]);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalSuccess(handler);
+            }
+        }
+
+        void OnError() {
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalError(m_handlers[slot],m_error);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalError(handler, m_error);
+            }
+        }
+
+        void OnCancelled() {
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalCancelled(m_handlers[slot], m_error);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalCancelled(handler, m_error);
+            }
+        }
+
+        #endregion
+
+        protected abstract Signal GetResolveSignal();
+
+        #region synchronization traits
+        protected void WaitResult(int timeout) {
+            if (!IsResolved)
+                GetResolveSignal().Wait(timeout);
+
+            switch (m_state) {
+                case SUCCEEDED_STATE:
+                    return;
+                case CANCELLED_STATE:
+                    throw new OperationCanceledException();
+                case REJECTED_STATE:
+                    throw new TargetInvocationException(m_error);
+                default:
+                    throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
+            }
+        }
+        #endregion
+
+        #region handlers managment
+
+        protected void AddHandler(THandler handler) {
+
+            if (m_state > 1) {
+                // the promise is in the resolved state, just invoke the handler
+                InvokeHandler(handler);
+            } else {
+                var slot = Interlocked.Increment(ref m_handlersCount) - 1;
+
+                if (slot < RESERVED_HANDLERS_COUNT) {
+
+                    m_handlers[slot] = handler;
+
+                    while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
+                    }
+
+                    if (m_state > 1) {
+                        do {
+                            var hp = m_handlerPointer;
+                            slot = hp + 1;
+                            if (slot < m_handlersCommited) {
+                                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
+                                    continue;
+                                InvokeHandler(m_handlers[slot]);
+                            }
+                            break;
+                        } while(true);
+                    }
+                } else {
+                    if (slot == RESERVED_HANDLERS_COUNT) {
+                        m_extraHandlers = new MTQueue<THandler>();
+                    } else {
+                        while (m_extraHandlers == null)
+                            Thread.MemoryBarrier();
+                    }
+
+                    m_extraHandlers.Enqueue(handler);
+
+                    if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
+                    // if the promise have been resolved while we was adding the handler to the queue
+                    // we can't guarantee that someone is still processing it
+                    // therefore we need to fetch a handler from the queue and execute it
+                    // note that fetched handler may be not the one that we have added
+                    // even we can fetch no handlers at all :)
+                    InvokeHandler(handler);
+                }
+            }
+        }
+
+        protected void InvokeHandler(THandler handler) {
+            switch (m_state) {
+                case SUCCEEDED_STATE:
+                    SignalSuccess(handler);
+                    break;
+                case CANCELLED_STATE:
+                    SignalCancelled(handler, m_error);
+                    break;
+                case REJECTED_STATE:
+                    SignalError(handler, m_error);
+                    break;
+                default:
+                    throw new Exception(String.Format("Invalid promise state {0}", m_state));
+            }
+        }
+
+        #endregion
+
+        #region IPromise implementation
+
+        public bool IsResolved {
+            get {
+                Thread.MemoryBarrier();
+                return m_state > 1;
+            }
+        }
+
+        public bool IsCancelled {
+            get {
+                Thread.MemoryBarrier();
+                return m_state == CANCELLED_STATE;
+            }
+        }
+
+        #endregion
+
+        public Exception Error {
+            get {
+                return m_error;
+            }
+        }
+
+        public bool AcceptIfRequested() {
+            if (IsCancelRequested)
+                CancelOperation(CancelReason);
+        }
+
+        public virtual void CancelOperation(Exception reason) {
+            SetCancelled(reason);
+        }
+
+        public void CancelationRequested(Action<Exception> handler) {
+            Safe.ArgumentNotNull(handler, "handler");
+            if (IsCancelRequested)
+                handler(CancelReason);
+
+            if (m_cancelationHandlers == null)
+                Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
+
+            m_cancelationHandlers.Enqueue(handler);
+
+            if (IsCancelRequested && m_cancelationHandlers.TryDequeue(out handler))
+                // TryDeque implies MemoryBarrier()
+                handler(m_cancelationReason);
+        }
+
+        public bool IsCancelRequested {
+            get {
+                do {
+                    if (m_cancelRequest == CANCEL_NOT_REQUESTED)
+                        return false;
+                    if (m_cancelRequest == CANCEL_REQUESTED)
+                        return true;
+                    Thread.MemoryBarrier();
+                } while(true);
+            }
+        }
+
+        public Exception CancelReason {
+            get {
+                do {
+                    Thread.MemoryBarrier();
+                } while(m_cancelRequest == CANCEL_REQUESTING);
+
+                return m_cancelationReason;
+            }
+        }
+
+        #region ICancellable implementation
+
+        public void Cancel() {
+            Cancel(null);
+        }
+
+        public void Cancel(Exception reason) {
+            if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING)) {
+                m_cancelationReason = reason;
+                m_cancelRequest = CANCEL_REQUESTED;
+                if (m_cancelationHandlers != null) {
+                    Action<Exception> handler;
+                    while (m_cancelationHandlers.TryDequeue(out handler))
+                        handler(m_cancelationReason);
+                }
+            }
+        }
+
+        #endregion
+    }
+}
+