changeset 19:e3935fdf59a2 promises

Promise is rewritten to use interlocked operations instead of locks
author cin
date Sun, 10 Nov 2013 00:21:33 +0400
parents 7cd4a843b4e4
children 1c3b3d518480
files Implab.Test/AsyncTests.cs Implab.v11.suo Implab/IPromise.cs Implab/Parallels/ArrayTraits.cs Implab/Parallels/MTQueue.cs Implab/Promise.cs
diffstat 6 files changed, 208 insertions(+), 205 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Sun Nov 10 00:21:33 2013 +0400
@@ -14,7 +14,7 @@
             p.Then(x => res = x);
             p.Resolve(100);
 
-            Assert.AreEqual(res, 100);
+            Assert.AreEqual(100, res);
         }
 
         [TestMethod]
@@ -244,7 +244,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(8,100,0)) {
+            using (var pool = new WorkerPool(4,4,0)) {
                 int count = 10000;
 
                 double[] args = new double[count];
Binary file Implab.v11.suo has changed
--- a/Implab/IPromise.cs	Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/IPromise.cs	Sun Nov 10 00:21:33 2013 +0400
@@ -15,19 +15,6 @@
             get;
         }
 
-        /// <summary>
-        /// The current state of the promise.
-        /// </summary>
-        PromiseState State
-        {
-            get;
-        }
 
-        /// <summary>
-        /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
-        /// handler will be invoked immediatelly.
-        /// </summary>
-        /// <param name="handler">The handler</param>
-        void HandleCancelled(Action handler);
     }
 }
--- a/Implab/Parallels/ArrayTraits.cs	Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Sun Nov 10 00:21:33 2013 +0400
@@ -140,7 +140,7 @@
 
             AsyncPool.InvokeNewThread(() => {
                 for (int i = 0; i < source.Length; i++) {
-                    if(promise.State != PromiseState.Unresolved)
+                    if(promise.IsResolved)
                         break; // stop processing in case of error or cancellation
                     var idx = i;
                     semaphore.WaitOne();
--- a/Implab/Parallels/MTQueue.cs	Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/Parallels/MTQueue.cs	Sun Nov 10 00:21:33 2013 +0400
@@ -42,12 +42,13 @@
                 next = first.next;
                 if (next == null) {
                     // this is the last element,
-                    // then try to update tail
+                    // then try to update the tail
                     if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
-                        // this is inconsistent situation which means that the queue is empty
+                        // this is a ace condition
                         if (m_last == null)
+                            // the queue is empty
                             return false;
-                        // tail has been changed, that means that we need to restart
+                        // tail has been changed, than we need to restart
                         continue; 
                     }
 
--- a/Implab/Promise.cs	Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/Promise.cs	Sun Nov 10 00:21:33 2013 +0400
@@ -3,6 +3,7 @@
 using System.Reflection;
 using System.Diagnostics;
 using System.Threading;
+using Implab.Parallels;
 
 namespace Implab {
 
@@ -48,24 +49,53 @@
     /// </remarks>
     public class Promise<T> : IPromise {
 
-        struct ResultHandlerInfo {
+        struct HandlerDescriptor {
             public ResultHandler<T> resultHandler;
             public ErrorHandler errorHandler;
+            public Action cancellHandler;
+
+            public void Resolve(T result) {
+                if (resultHandler != null)
+                    try {
+                        resultHandler(result);
+                    } catch (Exception e) {
+                        Reject(e);
+                    }
+            }
+
+            public void Reject(Exception err) {
+                if (errorHandler != null)
+                    try {
+                        errorHandler(err);
+                    } catch {
+                    }
+            }
+
+            public void Cancel() {
+                if (cancellHandler != null)
+                    try {
+                        cancellHandler();
+                    } catch {
+                    }
+            }
         }
 
+        const int UnresolvedSate = 0;
+        const int TransitionalState = 1;
+        const int ResolvedState = 2;
+        const int RejectedState = 3;
+        const int CancelledState = 4;
+
         readonly IPromise m_parent;
-
-        LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
-        LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
+        readonly bool m_cancellable;
 
-        readonly object m_lock = new Object();
-        readonly bool m_cancellable;
         int m_childrenCount = 0;
-
-        PromiseState m_state;
+        int m_state;
         T m_result;
         Exception m_error;
 
+        readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
+
         public Promise() {
             m_cancellable = true;
         }
@@ -73,8 +103,6 @@
         public Promise(IPromise parent, bool cancellable) {
             m_cancellable = cancellable;
             m_parent = parent;
-            if (parent != null)
-                parent.HandleCancelled(InternalCancel);
         }
 
         void InternalCancel() {
@@ -82,22 +110,39 @@
             Cancel(false);
         }
 
+        bool BeginTransit() {
+            return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
+        }
+
+        void CompleteTransit(int state) {
+            if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
+                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
+        }
+
+        public bool IsResolved {
+            get {
+                return m_state > 1;
+            }
+        }
+
+        public bool IsCancelled {
+            get {
+                return m_state == CancelledState;
+            }
+        }
+
         /// <summary>
         /// Выполняет обещание, сообщая об успешном выполнении.
         /// </summary>
         /// <param name="result">Результат выполнения.</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Resolve(T result) {
-            lock (m_lock) {
-                if (m_state == PromiseState.Cancelled)
-                    return;
-                if (m_state != PromiseState.Unresolved)
-                    throw new InvalidOperationException("The promise is already resolved");
+            if (BeginTransit()) {
                 m_result = result;
-                m_state = PromiseState.Resolved;
-            }
-
-            OnStateChanged();
+                CompleteTransit(ResolvedState);
+                OnStateChanged();
+            } else if (m_state != CancelledState)
+                throw new InvalidOperationException("The promise is already resolved");
         }
 
         /// <summary>
@@ -111,16 +156,12 @@
         /// <param name="error">Исключение возникшее при выполнении операции</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Reject(Exception error) {
-            lock (m_lock) {
-                if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
-                    return;
-                if (m_state != PromiseState.Unresolved)
-                    throw new InvalidOperationException("The promise is already resolved");
+            if (BeginTransit()) {
                 m_error = error;
-                m_state = PromiseState.Rejected;
-            }
-
-            OnStateChanged();
+                CompleteTransit(RejectedState);
+                OnStateChanged();
+            } else if (m_state == ResolvedState)
+                throw new InvalidOperationException("The promise is already resolved");
         }
 
         /// <summary>
@@ -144,27 +185,27 @@
 
             var medium = new Promise<T>(this, true);
 
-            var handlerInfo = new ResultHandlerInfo();
-
+            ResultHandler<T> resultHandler;
             if (success != null)
-                handlerInfo.resultHandler = x => {
+                resultHandler = x => {
                     success(x);
                     medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = medium.Resolve;
+                resultHandler = medium.Resolve;
 
+            ErrorHandler errorHandler;
             if (error != null)
-                handlerInfo.errorHandler = x => {
+                errorHandler = x => {
                     try {
                         error(x);
                     } catch { }
                     medium.Reject(x);
                 };
             else
-                handlerInfo.errorHandler = medium.Reject;
+                errorHandler = medium.Reject;
 
-            AddHandler(handlerInfo);
+            AddHandler(resultHandler, errorHandler, medium.InternalCancel);
 
             return medium;
         }
@@ -182,27 +223,28 @@
 
             var medium = new Promise<T>(this, true);
 
-            var handlerInfo = new ResultHandlerInfo();
+            ResultHandler<T> resultHandler;
+            ErrorHandler errorHandler;
 
             if (success != null)
-                handlerInfo.resultHandler = x => {
+                resultHandler = x => {
                     success(x);
                     medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = medium.Resolve;
+                resultHandler = medium.Resolve;
 
             if (error != null)
-                handlerInfo.errorHandler = x => {
+                errorHandler = x => {
                     try {
                         medium.Resolve(error(x));
                     } catch { }
                     medium.Reject(x);
                 };
             else
-                handlerInfo.errorHandler = medium.Reject;
+                errorHandler = medium.Reject;
 
-            AddHandler(handlerInfo);
+            AddHandler(resultHandler, errorHandler, medium.InternalCancel);
 
             return medium;
         }
@@ -214,19 +256,17 @@
 
             var medium = new Promise<T>(this, true);
 
-            var handlerInfo = new ResultHandlerInfo();
-
+            ResultHandler<T> resultHandler;
+            
             if (success != null)
-                handlerInfo.resultHandler = x => {
+                resultHandler = x => {
                     success(x);
                     medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = medium.Resolve;
+                resultHandler = medium.Resolve;
 
-            handlerInfo.errorHandler = medium.Reject;
-
-            AddHandler(handlerInfo);
+            AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
 
             return medium;
         }
@@ -249,15 +289,17 @@
 
             var medium = new Promise<T>(this, true);
 
-            AddHandler(new ResultHandlerInfo {
-                errorHandler = e => {
+            AddHandler(
+                null,
+                e => {
                     try {
                         medium.Resolve(handler(e));
                     } catch (Exception e2) {
                         medium.Reject(e2);
                     }
-                }
-            });
+                },
+                medium.InternalCancel
+            );
 
             return medium;
         }
@@ -268,8 +310,8 @@
 
             var medium = new Promise<T>();
 
-            AddHandler(new ResultHandlerInfo {
-                resultHandler = x => {
+            AddHandler(
+                x => {
                     // to avoid handler being called multiple times we handle exception by ourselfs
                     try {
                         handler();
@@ -278,13 +320,16 @@
                         medium.Reject(e);
                     }
                 },
-                errorHandler = x => {
+
+                e => {
                     try {
                         handler();
                     } catch { }
-                    medium.Reject(x);
-                }
-            });
+                    medium.Reject(e);
+                },
+
+                medium.InternalCancel
+            );
 
             return medium;
         }
@@ -304,17 +349,22 @@
             // создаем прицепленное обещание
             var chained = new Promise<TNew>();
 
-            AddHandler(new ResultHandlerInfo() {
-                resultHandler = result => chained.Resolve(mapper(result)),
-                errorHandler = delegate(Exception e) {
-                    if (error != null)
-                        try {
-                            error(e);
-                        } catch { }
-                    // в случае ошибки нужно передать исключение дальше по цепочке
-                    chained.Reject(e);
-                }
-            });
+            ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
+            ErrorHandler errorHandler = delegate(Exception e) {
+                if (error != null)
+                    try {
+                        error(e);
+                    } catch { }
+                // в случае ошибки нужно передать исключение дальше по цепочке
+                chained.Reject(e);
+            };
+
+
+            AddHandler(
+                resultHandler,
+                errorHandler,
+                chained.InternalCancel
+            );
 
             return chained;
         }
@@ -341,27 +391,32 @@
             // передать через него результаты работы.
             var medium = new Promise<TNew>(this, true);
 
-            AddHandler(new ResultHandlerInfo {
-                resultHandler = delegate(T result) {
-                    if (medium.State == PromiseState.Cancelled)
-                        return;
+            ResultHandler<T> resultHandler = delegate(T result) {
+                if (medium.IsCancelled)
+                    return;
 
-                    var promise = chained(result);
+                var promise = chained(result);
 
-                    // notify chained operation that it's not needed
-                    medium.Cancelled(() => promise.Cancel());
-                    promise.Then(
-                        x => medium.Resolve(x),
-                        e => medium.Reject(e)
-                    );
-                },
-                errorHandler = delegate(Exception e) {
-                    if (error != null)
-                        error(e);
-                    // в случае ошибки нужно передать исключение дальше по цепочке
-                    medium.Reject(e);
-                }
-            });
+                // notify chained operation that it's not needed
+                medium.Cancelled(() => promise.Cancel());
+                promise.Then(
+                    x => medium.Resolve(x),
+                    e => medium.Reject(e)
+                );
+            };
+
+            ErrorHandler errorHandler = delegate(Exception e) {
+                if (error != null)
+                    error(e);
+                // в случае ошибки нужно передать исключение дальше по цепочке
+                medium.Reject(e);
+            };
+
+            AddHandler(
+                resultHandler,
+                errorHandler,
+                medium.InternalCancel
+            );
 
             return medium;
         }
@@ -371,19 +426,19 @@
         }
 
         public Promise<T> Cancelled(Action handler) {
-            if (handler == null)
-                return this;
-            lock (m_lock) {
-                if (m_state == PromiseState.Unresolved)
-                    m_cancelHandlers.AddLast(handler);
-                else if (m_state == PromiseState.Cancelled)
-                    handler();
-            }
+            AddHandler(null, null, handler);
             return this;
         }
 
-        public void HandleCancelled(Action handler) {
-            Cancelled(handler);
+        public Promise<T> Finally(Action handler) {
+            if (handler == null)
+                throw new ArgumentNullException("handler");
+            AddHandler(
+                x => handler(),
+                e => handler(),
+                handler
+            );
+            return this;
         }
 
         /// <summary>
@@ -415,15 +470,15 @@
             if (!evt.WaitOne(timeout, true))
                 throw new TimeoutException();
 
-            switch (State) {
-                case PromiseState.Resolved:
+            switch (m_state) {
+                case ResolvedState:
                     return m_result;
-                case PromiseState.Cancelled:
+                case CancelledState:
                     throw new OperationCanceledException();
-                case PromiseState.Rejected:
+                case RejectedState:
                     throw new TargetInvocationException(m_error);
                 default:
-                    throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+                    throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
             }
         }
 
@@ -431,40 +486,45 @@
             return Join(Timeout.Infinite);
         }
 
-        void AddHandler(ResultHandlerInfo handler) {
-            bool invokeRequired = false;
+        void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
+            Interlocked.Increment(ref m_childrenCount);
+
+            HandlerDescriptor handler = new HandlerDescriptor {
+                resultHandler = success,
+                errorHandler = error,
+                cancellHandler = cancel
+            };
 
-            lock (m_lock) {
-                m_childrenCount++;
-                if (m_state == PromiseState.Unresolved) {
-                    m_resultHandlers.AddLast(handler);
-                } else
-                    invokeRequired = true;
+            bool queued;
+
+            if (!IsResolved) {
+                m_handlers.Enqueue(handler);
+                queued = true;
+            } else {
+                // the promise is in resolved state, just invoke the handled with minimum overhead
+                queued = false;
+                InvokeHandler(handler);
             }
 
-            // обработчики не должны блокировать сам объект
-            if (invokeRequired)
+            if (queued && IsResolved && m_handlers.TryDequeue(out handler))
+                // if the promise have been resolved while we was adding handler to the queue
+                // we can't guarantee that someone is still processing it
+                // therefore we will fetch a handler from the queue and execute it
+                // note that fetched handler may be not the one we have added
                 InvokeHandler(handler);
+
         }
 
-        void InvokeHandler(ResultHandlerInfo handler) {
+        void InvokeHandler(HandlerDescriptor handler) {
             switch (m_state) {
-                case PromiseState.Resolved:
-                    try {
-                        if (handler.resultHandler != null)
-                            handler.resultHandler(m_result);
-                    } catch (Exception e) {
-                        try {
-                            if (handler.errorHandler != null)
-                                handler.errorHandler(e);
-                        } catch { }
-                    }
+                case ResolvedState:
+                    handler.Resolve(m_result);
                     break;
-                case PromiseState.Rejected:
-                    try {
-                        if (handler.errorHandler != null)
-                            handler.errorHandler(m_error);
-                    } catch { }
+                case RejectedState:
+                    handler.Reject(m_error);
+                    break;
+                case CancelledState:
+                    handler.Cancel();
                     break;
                 default:
                     // do nothing
@@ -473,76 +533,31 @@
         }
 
         protected virtual void OnStateChanged() {
-            switch (m_state) {
-                case PromiseState.Resolved:
-                    foreach (var resultHandlerInfo in m_resultHandlers)
-                        try {
-                            if (resultHandlerInfo.resultHandler != null)
-                                resultHandlerInfo.resultHandler(m_result);
-                        } catch (Exception e) {
-                            try {
-                                if (resultHandlerInfo.errorHandler != null)
-                                    resultHandlerInfo.errorHandler(e);
-                            } catch { }
-                        }
-                    break;
-                case PromiseState.Cancelled:
-                    foreach (var cancelHandler in m_cancelHandlers)
-                        cancelHandler();
-                    break;
-                case PromiseState.Rejected:
-                    foreach (var resultHandlerInfo in m_resultHandlers)
-                        try {
-                            if (resultHandlerInfo.errorHandler != null)
-                                resultHandlerInfo.errorHandler(m_error);
-                        } catch { }
-                    break;
-                default:
-                    throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
-            }
-
-            m_resultHandlers = null;
-            m_cancelHandlers = null;
+            HandlerDescriptor handler;
+            while (m_handlers.TryDequeue(out handler))
+                InvokeHandler(handler);
         }
 
 
 
         public bool IsExclusive {
             get {
-                lock (m_lock) {
-                    return m_childrenCount <= 1;
-                }
-            }
-        }
-
-        public PromiseState State {
-            get {
-                lock (m_lock) {
-                    return m_state;
-                }
+                return m_childrenCount <= 1;
             }
         }
 
         protected bool Cancel(bool dependencies) {
-            bool result;
-
-            lock (m_lock) {
-                if (m_state == PromiseState.Unresolved) {
-                    m_state = PromiseState.Cancelled;
-                    result = true;
-                } else {
-                    result = false;
-                }
-            }
-
-            if (result)
+            if (BeginTransit()) {
+                CompleteTransit(CancelledState);
                 OnStateChanged();
 
-            if (dependencies && m_parent != null && m_parent.IsExclusive) {
-                m_parent.Cancel();
+                if (dependencies && m_parent != null && m_parent.IsExclusive)
+                    m_parent.Cancel();
+
+                return true;
+            } else {
+                return false;
             }
-
-            return result;
         }
 
     }