changeset 106:d4e38929ce36 v2

promises refactoring
author cin
date Mon, 10 Nov 2014 18:00:28 +0300
parents 4d308952fd5e
children f5220e5472ef
files Implab.Fx/AnimationHelpers.cs Implab.Fx/ControlBoundPromise.cs Implab.Test/AsyncTests.cs Implab/IPromiseT.cs Implab/Implab.csproj Implab/Parallels/MTCustomQueue.cs Implab/Parallels/MTCustomQueueNode.cs Implab/Promise.cs Implab/SyncContextPromise.cs
diffstat 9 files changed, 252 insertions(+), 144 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Fx/AnimationHelpers.cs	Mon Nov 10 10:17:54 2014 +0300
+++ b/Implab.Fx/AnimationHelpers.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -42,7 +42,13 @@
         {
             var anim = ctl.AnimateTransparency(0);
 
-            return anim.Play().DispatchToControl(ctl).Then(frm => frm.Close());
+            return anim
+                .Play()
+                .DispatchToControl(ctl)
+                .Then(frm => {
+                    frm.Close();
+                    return frm;
+                });
         }
 
         public static IPromise<T> OverlayFadeIn<T>(this Form that, T overlay) where T : Form
--- a/Implab.Fx/ControlBoundPromise.cs	Mon Nov 10 10:17:54 2014 +0300
+++ b/Implab.Fx/ControlBoundPromise.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -19,9 +19,9 @@
             m_target = target;
         }
 
-        protected override void InvokeHandler(HandlerDescriptor handler) {
+        protected override void InvokeHandler(AbstractHandler handler) {
             if (m_target.InvokeRequired)
-                m_target.BeginInvoke(new Action<HandlerDescriptor>(base.InvokeHandler), handler);
+                m_target.BeginInvoke(new Action<AbstractHandler>(base.InvokeHandler), handler);
             else
                 base.InvokeHandler(handler);
         }
--- a/Implab.Test/AsyncTests.cs	Mon Nov 10 10:17:54 2014 +0300
+++ b/Implab.Test/AsyncTests.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -426,8 +426,11 @@
                     hemStarted.Set();
                     // запускаем две асинхронные операции
                     var result = PromiseHelper
-                        .Sleep(10000, "HEM ENABLED!!!")
-                        .Then(s => pSurvive.Resolve(false));
+                        .Sleep(100000000, "HEM ENABLED!!!")
+                        .Then(s => {
+                            pSurvive.Resolve(false);
+                            return s;
+                        });
 
                     result
                         .Cancelled(() => pSurvive.Resolve(true));
--- a/Implab/IPromiseT.cs	Mon Nov 10 10:17:54 2014 +0300
+++ b/Implab/IPromiseT.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -13,12 +13,6 @@
 
         void On(Action<T> success);
 
-        IPromise<T> Then(Action<T> success, Func<Exception,T> error, Action cancel);
-
-        IPromise<T> Then(Action<T> success, Func<Exception,T> error);
-
-        IPromise<T> Then(Action<T> success);
-
         IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Action cancel);
 
         IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error);
--- a/Implab/Implab.csproj	Mon Nov 10 10:17:54 2014 +0300
+++ b/Implab/Implab.csproj	Mon Nov 10 18:00:28 2014 +0300
@@ -148,6 +148,8 @@
     <Compile Include="IComponentContainer.cs" />
     <Compile Include="MTComponentContainer.cs" />
     <Compile Include="PromiseEventType.cs" />
+    <Compile Include="Parallels\MTCustomQueue.cs" />
+    <Compile Include="Parallels\MTCustomQueueNode.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
   <ItemGroup />
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/MTCustomQueue.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -0,0 +1,135 @@
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Collections;
+
+namespace Implab.Parallels {
+    public class MTCustomQueue<TNode> : IEnumerable<TNode> where TNode : MTCustomQueueNode<TNode> {
+        TNode m_first;
+        TNode m_last;
+
+        public void Enqueue(TNode next) {
+            Thread.MemoryBarrier();
+
+            var last = m_last;
+
+            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
+            // to ensure that the next node is completely constructed
+            while (last != Interlocked.CompareExchange(ref m_last, next, last))
+                last = m_last;
+
+            if (last != null)
+                last.next = next;
+            else
+                m_first = next;
+        }
+
+        public bool TryDequeue(out TNode node) {
+            TNode first;
+            TNode next;
+            node = null;
+
+            Thread.MemoryBarrier();
+            do {
+                first = m_first;
+                if (first == null)
+                    return false;
+                next = first.next;
+                if (next == null) {
+                    // this is the last element,
+                    // then try to update the tail
+                    if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
+                        // this is the race condition
+                        if (m_last == null)
+                            // the queue is empty
+                            return false;
+                        // tail has been changed, we need to restart
+                        continue; 
+                    }
+
+                    // tail succesfully updated and first.next will never be changed
+                    // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null
+                    // however the parallel writer may update the m_first since the m_last is null
+
+                    // so we need to fix inconsistency by setting m_first to null or if it has been
+                    // updated by the writer already then we should just to give up
+                    Interlocked.CompareExchange(ref m_first, null, first);
+                    break;
+
+                }
+                if (first == Interlocked.CompareExchange(ref m_first, next, first))
+                    // head succesfully updated
+                    break;
+            } while (true);
+
+            node = first;
+            return true;
+        }
+
+        #region IEnumerable implementation
+
+        class Enumerator : IEnumerator<TNode> {
+            TNode m_current;
+            TNode m_first;
+
+            public Enumerator(TNode first) {
+                m_first = first;
+            }
+
+            #region IEnumerator implementation
+
+            public bool MoveNext() {
+                m_current = m_current == null ? m_first : m_current.next;
+                return m_current != null;
+            }
+
+            public void Reset() {
+                m_current = null;
+            }
+
+            object IEnumerator.Current {
+                get {
+                    if (m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current;
+                }
+            }
+
+            #endregion
+
+            #region IDisposable implementation
+
+            public void Dispose() {
+            }
+
+            #endregion
+
+            #region IEnumerator implementation
+
+            public TNode Current {
+                get {
+                    if (m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current;
+                }
+            }
+
+            #endregion
+        }
+
+        public IEnumerator<TNode> GetEnumerator() {
+            return new Enumerator(m_first);
+        }
+
+        #endregion
+
+        #region IEnumerable implementation
+
+        IEnumerator IEnumerable.GetEnumerator() {
+            return GetEnumerator();
+        }
+
+        #endregion
+    }
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/MTCustomQueueNode.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -0,0 +1,6 @@
+namespace Implab.Parallels {
+    public class MTCustomQueueNode<TNode> where TNode : MTCustomQueueNode<TNode> {
+        public TNode next;
+    }
+}
+
--- a/Implab/Promise.cs	Mon Nov 10 10:17:54 2014 +0300
+++ b/Implab/Promise.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -42,53 +42,68 @@
     /// </remarks>
     public class Promise<T> : IPromise<T> {
 
-        protected struct HandlerDescriptor {
-            public Action<T> resultHandler;
-            public Func<Exception,T> errorHandler;
-            public Action cancellHandler;
-            public Promise<T> medium;
+        protected abstract class AbstractHandler : MTCustomQueueNode<AbstractHandler> {
+            public abstract void Resolve(T result);
+            public abstract void Reject(Exception error);
+            public abstract void Cancel();
+        }
+
+        protected class HandlerDescriptor<T2> : AbstractHandler  {
+
+            readonly Func<T,T2> m_resultHandler;
+            readonly Func<Exception,T2> m_errorHandler;
+            readonly Action m_cancellHandler;
+            readonly Promise<T2> m_medium;
 
-            public void Resolve(T result) {
-                if (resultHandler != null) {
+            public HandlerDescriptor(Func<T,T2> resultHandler, Func<Exception,T2> errorHandler, Action cancelHandler, Promise<T2> medium) {
+                m_resultHandler = resultHandler;
+                m_errorHandler = errorHandler;
+                m_cancellHandler = cancelHandler;
+                m_medium = medium;
+            }
+
+            public override void Resolve(T result) {
+                if (m_resultHandler != null) {
                     try {
-                        resultHandler(result);
+                        if (m_medium != null)
+                            m_medium.Resolve(m_resultHandler(result));
+                        else
+                            m_resultHandler(result);
                     } catch (Exception e) {
                         Reject(e);
-                        return;
                     }
-                }
-                if (medium != null)
-                    medium.Resolve(result);
+                } else if(m_medium != null)
+                    m_medium.Resolve(default(T2));
             }
 
-            public void Reject(Exception err) {
-                if (errorHandler != null) {
+            public override void Reject(Exception error) {
+                if (m_errorHandler != null) {
                     try {
-                        var res = errorHandler(err);
-                        if (medium != null)
-                            medium.Resolve(res);
+                        var res = m_errorHandler(error);
+                        if (m_medium != null)
+                            m_medium.Resolve(res);
                         /*} catch (TransientPromiseException err2) {
                         if (medium != null)
                             medium.Reject(err2.InnerException);*/
                     } catch (Exception err2) {
-                        if (medium != null)
-                            medium.Reject(err2);
+                        if (m_medium != null)
+                            m_medium.Reject(err2);
                     }
-                } else if (medium != null)
-                    medium.Reject(err);
+                } else if (m_medium != null)
+                    m_medium.Reject(error);
             }
 
-            public void Cancel() {
-                if (cancellHandler != null) {
+            public override void Cancel() {
+                if (m_cancellHandler != null) {
                     try {
-                        cancellHandler();
+                        m_cancellHandler();
                     } catch (Exception err) {
                         Reject(err);
                         return;
                     }
                 }
-                if (medium != null)
-                    medium.Cancel();
+                if (m_medium != null)
+                    m_medium.Cancel();
             }
         }
 
@@ -103,14 +118,15 @@
         T m_result;
         Exception m_error;
 
-        readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
+        readonly MTCustomQueue<AbstractHandler> m_handlers = new MTCustomQueue<AbstractHandler>();
+        //readonly MTQueue<AbstractHandler> m_handlers = new MTQueue<AbstractHandler>();
 
         public Promise() {
         }
 
         public Promise(IPromise parent) {
             if (parent != null)
-                AddHandler(
+                AddHandler<T>(
                     null,
                     null,
                     () => {
@@ -215,49 +231,6 @@
             }
         }
 
-        public IPromise<T> Then(Action<T> success, Func<Exception,T> error, Action cancel) {
-            if (success == null && error == null && cancel == null)
-                return this;
-
-            var medium = new Promise<T>(this);
-
-            AddHandler(success, error, cancel, medium, true);
-
-            return medium;
-        }
-
-        /// <summary>
-        /// Adds new handlers to this promise.
-        /// </summary>
-        /// <param name="success">The handler of the successfully completed operation.
-        /// This handler will recieve an operation result as a parameter.</param>
-        /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
-        /// <returns>The new promise chained to this one.</returns>
-        public IPromise<T> Then(Action<T> success, Func<Exception,T> error) {
-            if (success == null && error == null)
-                return this;
-
-            var medium = new Promise<T>(this);
-
-            AddHandler(success, error, null, medium, true);
-
-            return medium;
-        }
-
-        
-
-
-        public IPromise<T> Then(Action<T> success) {
-            if (success == null)
-                return this;
-
-            var medium = new Promise<T>(this);
-
-            AddHandler(success, null, null, medium, true);
-
-            return medium;
-        }
-
         /// <summary>
         /// Последний обработчик в цепочки обещаний.
         /// </summary>
@@ -279,13 +252,19 @@
             if (success == null && error == null && cancel == null)
                 return;
 
-            Func<Exception,T> errorHandler = null;
-            if (error != null)
-                errorHandler = err => {
-                    error(err);
+            AddHandler(
+                success != null ? new Func<T,T>(x => {
+                    success(x);
+                    return x;
+                }) : null,
+                error != null ? new Func<Exception,T>(e => {
+                    error(e);
                     return default(T);
-                };
-            AddHandler(success, errorHandler, cancel, null, false);
+                }) : null,
+                cancel,
+                null,
+                false
+            );
         }
 
         public void On(Action<T> success, Action<Exception> error) {
@@ -299,7 +278,10 @@
         public void On(Action handler, PromiseEventType events) {
             Safe.ArgumentNotNull(handler, "handler");
 
-            Action<T> success = events.HasFlag(PromiseEventType.Success) ? new Action<T>(x => handler()) : null;
+            Func<T,T> success = events.HasFlag(PromiseEventType.Success) ? new Func<T,T>(x => {
+                handler();
+                return x;
+            }) : null;
             Func<Exception,T> error = events.HasFlag(PromiseEventType.Error) ? new Func<Exception,T>(e => {
                 handler();
                 return default(T);
@@ -363,39 +345,11 @@
             // создаем прицепленное обещание
             var medium = new Promise<TNew>(this);
 
-            Action<T> resultHandler = result => medium.Resolve(mapper(result));
-            Func<Exception,T> errorHandler;
-            if (error != null)
-                errorHandler = e => {
-                    try {
-                        medium.Resolve(error(e));
-                    } catch (Exception e2) {
-                        // в случае ошибки нужно передать исключение дальше по цепочке
-                        medium.Reject(e2);
-                    }
-                    return default(T);
-                };
-            else
-                errorHandler = e => {
-                    medium.Reject(e);
-                    return default(T);
-                };
-
-            Action cancelHandler;
-            if (cancel != null)
-                cancelHandler = () => {
-                    cancel();
-                    medium.Cancel();
-                };
-            else
-                cancelHandler = medium.Cancel;
-
-
             AddHandler(
-                resultHandler,
-                errorHandler,
-                cancelHandler,
-                null,
+                mapper,
+                error,
+                cancel,
+                medium,
                 true
             );
 
@@ -431,9 +385,9 @@
             // передать через него результаты работы.
             var medium = new Promise<TNew>(this);
 
-            Action<T> resultHandler = delegate(T result) {
+            Func<T,T> resultHandler = delegate(T result) {
                 if (medium.IsCancelled)
-                    return;
+                    return default(T);
 
                 var promise = chained(result);
 
@@ -454,6 +408,8 @@
                             promise.Cancel();
                     }
                 );
+
+                return default(T);
             };
 
             Func<Exception,T> errorHandler;
@@ -534,7 +490,10 @@
             var medium = new Promise<T>(this);
 
             AddHandler(
-                x => handler(),
+                x => {
+                    handler();
+                    return x;
+                },
                 e => {
                     handler();
                     throw new TransientPromiseException(e);
@@ -600,16 +559,11 @@
             return Join(Timeout.Infinite);
         }
 
-        void AddHandler(Action<T> success, Func<Exception,T> error, Action cancel, Promise<T> medium, bool inc) {
+        void AddHandler<T2>(Func<T,T2> success, Func<Exception,T2> error, Action cancel, Promise<T2> medium, bool inc) {
             if (inc)
                 Interlocked.Increment(ref m_childrenCount);
 
-            var handler = new HandlerDescriptor {
-                resultHandler = success,
-                errorHandler = error,
-                cancellHandler = cancel,
-                medium = medium
-            };
+            AbstractHandler handler = new HandlerDescriptor<T2>(success, error, cancel, medium);
 
             bool queued;
 
@@ -631,7 +585,7 @@
                 InvokeHandler(handler);
         }
 
-        protected virtual void InvokeHandler(HandlerDescriptor handler) {
+        protected virtual void InvokeHandler(AbstractHandler handler) {
             switch (m_state) {
                 case SUCCEEDED_STATE:
                     handler.Resolve(m_result);
@@ -649,7 +603,7 @@
         }
 
         void OnStateChanged() {
-            HandlerDescriptor handler;
+            AbstractHandler handler;
             while (m_handlers.TryDequeue(out handler))
                 InvokeHandler(handler);
         }
@@ -688,16 +642,13 @@
                 var dest = i;
 
                 if (promises[i] != null) {
-                    promises[i].Then(
+                    promises[i].On(
                         x => {
                             result[dest] = x;
                             if (Interlocked.Decrement(ref pending) == 0)
                                 promise.Resolve(result);
                         },
-                        e => {
-                            promise.Reject(e);
-                            return default(T);
-                        }
+                        promise.Reject
                     );
                 } else {
                     if (Interlocked.Decrement(ref pending) == 0)
@@ -776,7 +727,10 @@
 
         IPromise IPromise.Then(Action success, Action<Exception> error, Action cancel) {
             return Then(
-                success != null ? new Action<T>(x => success()) : null,
+                success != null ? new Func<T,T>(x => {
+                    success();
+                    return x;
+                }) : null,
                 error != null ? new Func<Exception,T>(e => {
                     error(e);
                     return default(T);
@@ -787,7 +741,10 @@
 
         IPromise IPromise.Then(Action success, Action<Exception> error) {
             return Then(
-                success != null ? new Action<T>(x => success()) : null,
+                success != null ? new Func<T,T>(x => {
+                    success();
+                    return x;
+                }) : null,
                 error != null ? new Func<Exception,T>(e => {
                     error(e);
                     return default(T);
@@ -797,7 +754,10 @@
 
         IPromise IPromise.Then(Action success) {
             Safe.ArgumentNotNull(success, "success");
-            return Then(x => success());
+            return Then(x => {
+                success();
+                return x;
+            });
         }
 
         IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) {
@@ -809,9 +769,9 @@
 
             var medium = new Promise<object>(this);
 
-            Action<T> resultHandler = delegate {
+            Func<T,T> resultHandler = delegate {
                 if (medium.IsCancelled)
-                    return;
+                    return default(T);
 
                 var promise = chained();
 
@@ -828,6 +788,8 @@
                     if (promise.IsExclusive)
                         promise.Cancel();
                 });
+
+                return default(T);
             };
 
             Func<Exception,T> errorHandler;
--- a/Implab/SyncContextPromise.cs	Mon Nov 10 10:17:54 2014 +0300
+++ b/Implab/SyncContextPromise.cs	Mon Nov 10 18:00:28 2014 +0300
@@ -14,7 +14,7 @@
             Safe.ArgumentNotNull(context, "context");
             m_context = context;
         }
-        protected override void InvokeHandler(HandlerDescriptor handler) {
+        protected override void InvokeHandler(AbstractHandler handler) {
             m_context.Post(x => base.InvokeHandler(handler),null);
         }
     }