diff Implab/AbstractEvent.cs @ 242:cbe10ac0731e v3

Working on promises
author cin
date Wed, 24 Jan 2018 03:03:21 +0300
parents fa6cbf4d8841
children b1e0ffdf3451
line wrap: on
line diff
--- a/Implab/AbstractEvent.cs	Tue Jan 23 19:39:21 2018 +0300
+++ b/Implab/AbstractEvent.cs	Wed Jan 24 03:03:21 2018 +0300
@@ -2,56 +2,50 @@
 using Implab.Parallels;
 using System.Threading;
 using System.Reflection;
+using System.Diagnostics;
 
 namespace Implab {
-    public abstract class AbstractEvent<THandler> : ICancellable { 
+    public abstract class AbstractEvent<THandler> where THandler : class {
 
-        const int UNRESOLVED_SATE = 0;
-        const int TRANSITIONAL_STATE = 1;
+        const int PENDING_SATE = 0;
+        protected const int TRANSITIONAL_STATE = 1;
+
         protected const int SUCCEEDED_STATE = 2;
         protected const int REJECTED_STATE = 3;
-        protected const int CANCELLED_STATE = 4;
 
-        const int CANCEL_NOT_REQUESTED = 0;
-        const int CANCEL_REQUESTING = 1;
-        const int CANCEL_REQUESTED = 2;
-
-        const int RESERVED_HANDLERS_COUNT = 4;
-
-        int m_state;
+        volatile int m_state;
         Exception m_error;
-        int m_handlersCount;
 
-        //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
-        THandler[] m_handlers;
+        THandler m_handler;
         SimpleAsyncQueue<THandler> m_extraHandlers;
-        int m_handlerPointer = -1;
-        int m_handlersCommited;
-
-        int m_cancelRequest;
-        Exception m_cancelationReason;
 
         #region state managment
-        bool BeginTransit() {
-            return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
+        protected bool BeginTransit() {
+            return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE);
         }
 
-        void CompleteTransit(int state) {
+        protected void CompleteTransit(int state) {
+#if DEBUG
             if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
                 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
+#else
+            m_state = state;
+#endif
+            Signal();
         }
 
-        void WaitTransition() {
-            while (m_state == TRANSITIONAL_STATE) {
-                Thread.MemoryBarrier();
+        protected void WaitTransition() {
+            if (m_state == TRANSITIONAL_STATE) {
+                SpinWait spin;
+                do {
+                    spin.SpinOnce();
+                } while (m_state == TRANSITIONAL_STATE);
             }
         }
 
         protected bool BeginSetResult() {
             if (!BeginTransit()) {
                 WaitTransition();
-                if (m_state != CANCELLED_STATE)
-                    throw new InvalidOperationException("The promise is already resolved");
                 return false;
             }
             return true;
@@ -59,7 +53,6 @@
 
         protected void EndSetResult() {
             CompleteTransit(SUCCEEDED_STATE);
-            Signal();
         }
 
 
@@ -78,8 +71,6 @@
             if (BeginTransit()) {
                 m_error = error;
                 CompleteTransit(REJECTED_STATE);
-
-                Signal();
             } else {
                 WaitTransition();
                 if (m_state == SUCCEEDED_STATE)
@@ -87,58 +78,33 @@
             }
         }
 
-        /// <summary>
-        /// Отменяет операцию, если это возможно.
-        /// </summary>
-        /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
-        protected void SetCancelled(Exception reason) {
-            if (BeginTransit()) {
-                m_error = reason;
-                CompleteTransit(CANCELLED_STATE);
-                Signal();
-            }
-        }
-
         protected abstract void SignalHandler(THandler handler, int signal);
 
         void Signal() {
-            var hp = m_handlerPointer;
-            var slot = hp +1 ;
-            while (slot < m_handlersCommited) {
-                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
-                    SignalHandler(m_handlers[slot], m_state);
-                }
-                hp = m_handlerPointer;
-                slot = hp +1 ;
-            }
-
-
-            if (m_extraHandlers != null) {
-                THandler handler;
-                while (m_extraHandlers.TryDequeue(out handler))
-                    SignalHandler(handler, m_state);
-            }
+            THandler handler;
+            while (TryDequeueHandler(out handler))
+                SignalHandler(handler, m_state);
         }
 
         #endregion
 
-        protected abstract Signal GetResolveSignal();
+        protected abstract Signal GetFulfillSignal();
 
         #region synchronization traits
         protected void WaitResult(int timeout) {
-            if (!(IsResolved || GetResolveSignal().Wait(timeout)))
+            if (!(IsFulfilled || GetFulfillSignal().Wait(timeout)))
                 throw new TimeoutException();
 
-            switch (m_state) {
-                case SUCCEEDED_STATE:
-                    return;
-                case CANCELLED_STATE:
-                    throw new OperationCanceledException("The operation has been cancelled", m_error);
-                case REJECTED_STATE:
-                    throw new TargetInvocationException(m_error);
-                default:
-                    throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state));
-            }
+            if (IsRejected)
+                Rethrow();
+        }
+
+        protected void Rethrow() {
+            Debug.Assert(m_error != null);
+            if (m_error is OperationCanceledException)
+                throw new OperationCanceledException("Operation cancelled", m_error);
+            else
+                throw new TargetInvocationException(m_error);
         }
         #endregion
 
@@ -150,149 +116,55 @@
                 // the promise is in the resolved state, just invoke the handler
                 SignalHandler(handler, m_state);
             } else {
-                var slot = Interlocked.Increment(ref m_handlersCount) - 1;
-
-                if (slot < RESERVED_HANDLERS_COUNT) {
-
-                    if (slot == 0) {
-                        m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
-                    } else {
-                        while (m_handlers == null)
-                            Thread.MemoryBarrier();
-                    }
-
-                    m_handlers[slot] = handler;
-
-                    while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
-                    }
+                if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) {
+                    if (m_extraHandlers == null)
+                        // compare-exchange will fprotect from loosing already created queue
+                        Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null);
+                    m_extraHandlers.Enqueue(handler);
+                }
 
-                    if (m_state > 1) {
-                        do {
-                            var hp = m_handlerPointer;
-                            slot = hp + 1;
-                            if (slot < m_handlersCommited) {
-                                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
-                                    continue;
-                                SignalHandler(m_handlers[slot], m_state);
-                            }
-                            break;
-                        } while(true);
-                    }
-                } else {
-                    if (slot == RESERVED_HANDLERS_COUNT) {
-                        m_extraHandlers = new SimpleAsyncQueue<THandler>();
-                    } else {
-                        while (m_extraHandlers == null)
-                            Thread.MemoryBarrier();
-                    }
-
-                    m_extraHandlers.Enqueue(handler);
-
-                    if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
+                if (m_state > 1 && TryDequeueHandler(out handler))
                     // if the promise have been resolved while we was adding the handler to the queue
                     // we can't guarantee that someone is still processing it
                     // therefore we need to fetch a handler from the queue and execute it
                     // note that fetched handler may be not the one that we have added
                     // even we can fetch no handlers at all :)
-                        SignalHandler(handler, m_state);
-                }
+                    SignalHandler(handler, m_state);
             }
+
+        }
+
+        bool TryDequeueHandler(out THandler handler) {
+            handler = Interlocked.Exchange(ref m_handler, null);
+            if (handler != null)
+                return true;
+            return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler);
         }
 
         #endregion
 
         #region IPromise implementation
 
-        public bool IsResolved {
+        public bool IsFulfilled {
             get {
-                Thread.MemoryBarrier();
-                return m_state > 1;
+                return m_state > TRANSITIONAL_STATE;
             }
         }
 
-        public bool IsCancelled {
+        public bool IsRejected {
             get {
-                Thread.MemoryBarrier();
-                return m_state == CANCELLED_STATE;
+                return m_state == REJECTED_STATE;
             }
         }
 
         #endregion
 
-        public Exception Error {
+        public Exception RejectReason {
             get {
                 return m_error;
             }
         }
 
-        public bool CancelOperationIfRequested() {
-            if (IsCancellationRequested) {
-                CancelOperation(CancellationReason);
-                return true;
-            }
-            return false;
-        }
-
-        public virtual void CancelOperation(Exception reason) {
-            SetCancelled(reason);
-        }
-
-        public void CancellationRequested(Action<Exception> handler) {
-            Safe.ArgumentNotNull(handler, "handler");
-            if (IsCancellationRequested)
-                handler(CancellationReason);
-
-            if (m_cancelationHandlers == null)
-                Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);
-
-            m_cancelationHandlers.Enqueue(handler);
-
-            if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
-                // TryDeque implies MemoryBarrier()
-                handler(m_cancelationReason);
-        }
-
-        public bool IsCancellationRequested {
-            get {
-                do {
-                    if (m_cancelRequest == CANCEL_NOT_REQUESTED)
-                        return false;
-                    if (m_cancelRequest == CANCEL_REQUESTED)
-                        return true;
-                    Thread.MemoryBarrier();
-                } while(true);
-            }
-        }
-
-        public Exception CancellationReason {
-            get {
-                do {
-                    Thread.MemoryBarrier();
-                } while(m_cancelRequest == CANCEL_REQUESTING);
-
-                return m_cancelationReason;
-            }
-        }
-
-        #region ICancellable implementation
-
-        public void Cancel() {
-            Cancel(null);
-        }
-
-        public void Cancel(Exception reason) {
-            if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
-                m_cancelationReason = reason;
-                m_cancelRequest = CANCEL_REQUESTED;
-                if (m_cancelationHandlers != null) {
-                    Action<Exception> handler;
-                    while (m_cancelationHandlers.TryDequeue(out handler))
-                        handler(m_cancelationReason);
-                }
-            }
-        }
-
-        #endregion
     }
 }