diff Implab/AbstractPromise.cs @ 119:2573b562e328 v2

Promises rewritten, added improved version of AsyncQueue
author cin
date Sun, 11 Jan 2015 19:13:02 +0300
parents
children f803565868a4
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/AbstractPromise.cs	Sun Jan 11 19:13:02 2015 +0300
@@ -0,0 +1,219 @@
+using System;
+using Implab.Parallels;
+using System.Threading;
+using System.Reflection;
+
+namespace Implab {
+    public abstract class AbstractPromise<THandler> { 
+
+        const int UNRESOLVED_SATE = 0;
+        const int TRANSITIONAL_STATE = 1;
+        const int SUCCEEDED_STATE = 2;
+        const int REJECTED_STATE = 3;
+        const int CANCELLED_STATE = 4;
+
+        int m_state;
+        Exception m_error;
+
+        readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
+
+        #region state managment
+        bool BeginTransit() {
+            return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
+        }
+
+        void CompleteTransit(int state) {
+            if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
+                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
+        }
+
+        void WaitTransition() {
+            while (m_state == TRANSITIONAL_STATE) {
+                Thread.MemoryBarrier();
+            }
+        }
+
+        protected void BeginSetResult() {
+            if (!BeginTransit()) {
+                WaitTransition();
+                if (m_state != CANCELLED_STATE)
+                    throw new InvalidOperationException("The promise is already resolved");
+            }
+        }
+
+        protected void EndSetResult() {
+            CompleteTransit(SUCCEEDED_STATE);
+            OnSuccess();
+        }
+
+
+
+        /// <summary>
+        /// Выполняет обещание, сообщая об ошибке
+        /// </summary>
+        /// <remarks>
+        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
+        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
+        /// будут проигнорированы.
+        /// </remarks>
+        /// <param name="error">Исключение возникшее при выполнении операции</param>
+        /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
+        protected void SetError(Exception error) {
+            if (BeginTransit()) {
+                m_error = error is PromiseTransientException ? error.InnerException : error;
+                CompleteTransit(REJECTED_STATE);
+                OnError();
+            } else {
+                WaitTransition();
+                if (m_state == SUCCEEDED_STATE)
+                    throw new InvalidOperationException("The promise is already resolved");
+            }
+        }
+
+        /// <summary>
+        /// Отменяет операцию, если это возможно.
+        /// </summary>
+        /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
+        protected void SetCancelled() {
+            if (BeginTransit()) {
+                CompleteTransit(CANCELLED_STATE);
+                OnCancelled();
+            }
+        }
+
+        protected abstract void SignalSuccess(THandler handler);
+
+        protected abstract void SignalError(THandler handler, Exception error);
+
+        protected abstract void SignalCancelled(THandler handler);
+
+        void OnSuccess() {
+            THandler handler;
+            while (m_handlers.TryDequeue(out handler))
+                SignalSuccess(handler);
+        }
+
+        void OnError() {
+            THandler handler;
+            while (m_handlers.TryDequeue(out handler))
+                SignalError(handler,m_error);
+        }
+
+        void OnCancelled() {
+            THandler handler;
+            while (m_handlers.TryDequeue(out handler))
+                SignalCancelled(handler);
+        }
+
+        #endregion
+
+        protected abstract void Listen(PromiseEventType events, Action handler);
+
+        #region synchronization traits
+        protected void WaitResult(int timeout) {
+            if (!IsResolved) {
+                var lk = new object();
+
+                Listen(PromiseEventType.All, () => {
+                    lock(lk) {
+                        Monitor.Pulse(lk);
+                    }
+                });
+
+                lock (lk) {
+                    while(!IsResolved) {
+                        if(!Monitor.Wait(lk,timeout))
+                            throw new TimeoutException();
+                    }
+                }
+
+            }
+            switch (m_state) {
+                case SUCCEEDED_STATE:
+                    return;
+                case CANCELLED_STATE:
+                    throw new OperationCanceledException();
+                case REJECTED_STATE:
+                    throw new TargetInvocationException(m_error);
+                default:
+                    throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
+            }
+        }
+        #endregion
+
+        #region handlers managment
+
+        protected void AddHandler(THandler handler) {
+
+            if (IsResolved) {
+                InvokeHandler(handler);
+
+            } else {
+                // the promise is in the resolved state, just invoke the handler
+                m_handlers.Enqueue(handler);
+
+
+                if (IsResolved && m_handlers.TryDequeue(out handler))
+                    // if the promise have been resolved while we was adding the handler to the queue
+                    // we can't guarantee that someone is still processing it
+                    // therefore we need to fetch a handler from the queue and execute it
+                    // note that fetched handler may be not the one that we have added
+                    // even we can fetch no handlers at all :)
+                    InvokeHandler(handler);
+            }
+        }
+
+        protected void InvokeHandler(THandler handler) {
+            switch (m_state) {
+                case SUCCEEDED_STATE:
+                    SignalSuccess(handler);
+                    break;
+                case CANCELLED_STATE:
+                    SignalCancelled(handler);
+                    break;
+                case REJECTED_STATE:
+                    SignalError(handler, m_error);
+                    break;
+                default:
+                    throw new Exception(String.Format("Invalid promise state {0}", m_state));
+            }
+        }
+
+        #endregion
+
+        #region IPromise implementation
+
+        public void Join(int timeout) {
+            WaitResult(timeout);
+        }
+
+        public void Join() {
+            WaitResult(-1);
+        }
+
+        public bool IsResolved {
+            get {
+                Thread.MemoryBarrier();
+                return m_state > 1;
+            }
+        }
+
+        public bool IsCancelled {
+            get {
+                Thread.MemoryBarrier();
+                return m_state == CANCELLED_STATE;
+            }
+        }
+
+        #endregion
+
+        #region ICancellable implementation
+
+        public void Cancel() {
+            SetCancelled();
+        }
+
+        #endregion
+    }
+}
+