changeset 185:822aab37b107 ref20160224

runnable component, work in progress
author cin
date Mon, 18 Apr 2016 16:41:17 +0300 (2016-04-18)
parents d6a8cba73acc
children 75103928da09
files Implab.Test/Implab.Test.mono.csproj Implab.Test/RunnableComponentTests.cs Implab/AbstractEvent.cs Implab/ActionChainTask.cs Implab/Components/RunnableComponent.cs Implab/PromiseExtensions.cs
diffstat 6 files changed, 358 insertions(+), 96 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/Implab.Test.mono.csproj	Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab.Test/Implab.Test.mono.csproj	Mon Apr 18 16:41:17 2016 +0300
@@ -58,6 +58,7 @@
     <Compile Include="PromiseHelper.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="CancelationTests.cs" />
+    <Compile Include="RunnableComponentTests.cs" />
   </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="..\Implab\Implab.csproj">
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab.Test/RunnableComponentTests.cs	Mon Apr 18 16:41:17 2016 +0300
@@ -0,0 +1,194 @@
+using System;
+using System.Reflection;
+using System.Threading;
+using Implab.Parallels;
+using Implab.Components;
+
+#if MONO
+
+using NUnit.Framework;
+using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
+using TestMethodAttribute = NUnit.Framework.TestAttribute;
+
+#else
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+#endif
+
+namespace Implab.Test {
+    [TestClass]
+    public class RunnableComponentTests {
+
+        static void ShouldThrow(Action action) {
+            try {
+                action();
+                Assert.Fail();
+            } catch(AssertionException) {
+                throw;
+            } catch {
+            }
+        }
+
+        class Runnable : RunnableComponent {
+            public Runnable(bool initialized) : base(initialized) {
+            }
+                
+            public Action MockInit {
+                get;
+                set;
+            }
+
+            public Func<IPromise> MockStart {
+                get;
+                set;
+            }
+
+            public Func<IPromise> MockStop {
+                get;
+                set;
+            }
+
+            protected override IPromise OnStart() {
+                return MockStart != null ? MockStart() : base.OnStart();
+            }
+
+            protected override IPromise OnStop() {
+                return MockStop != null ? MockStop() : base.OnStart();
+            }
+
+            protected override void OnInitialize() {
+                if (MockInit != null)
+                    MockInit();
+            }
+        }
+
+        [TestMethod]
+        public void NormalFlowTest() {
+            var comp = new Runnable(false);
+
+            Assert.AreEqual(ExecutionState.Created, comp.State);
+
+            comp.Init();
+
+            Assert.AreEqual(ExecutionState.Ready, comp.State);
+
+            comp.Start().Join(1000);
+
+            Assert.AreEqual(ExecutionState.Running, comp.State);
+
+            comp.Stop().Join(1000);
+
+            Assert.AreEqual(ExecutionState.Disposed, comp.State);
+
+        }
+
+        [TestMethod]
+        public void InitFailTest() {
+            var comp = new Runnable(false) {
+                MockInit = () => {
+                    throw new Exception("BAD");
+                }
+            };
+
+            ShouldThrow(() => comp.Start());
+            ShouldThrow(() => comp.Stop());
+            Assert.AreEqual(ExecutionState.Created, comp.State);
+
+            ShouldThrow(comp.Init);
+
+            Assert.AreEqual(ExecutionState.Failed, comp.State);
+
+            ShouldThrow(() => comp.Start());
+            ShouldThrow(() => comp.Stop());
+            Assert.AreEqual(ExecutionState.Failed, comp.State);
+
+            comp.Dispose();
+            Assert.AreEqual(ExecutionState.Disposed, comp.State);
+        }
+
+        [TestMethod]
+        public void DisposedTest() {
+
+            var comp = new Runnable(false);
+            comp.Dispose();
+
+            ShouldThrow(() => comp.Start());
+            ShouldThrow(() => comp.Stop());
+            ShouldThrow(comp.Init);
+
+            Assert.AreEqual(ExecutionState.Disposed, comp.State);
+        }
+
+        [TestMethod]
+        public void StartCancelTest() {
+            var comp = new Runnable(true) {
+                MockStart = () => PromiseHelper.Sleep(100000, 0)
+            };
+
+            var p = comp.Start();
+            Assert.AreEqual(ExecutionState.Starting, comp.State);
+            p.Cancel();
+            ShouldThrow(() => p.Join(1000));
+            Assert.AreEqual(ExecutionState.Failed, comp.State);
+            Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError);
+
+            comp.Dispose();
+        }
+
+        [TestMethod]
+        public void StartStopTest() {
+            var stop = new Signal();
+            var comp = new Runnable(true) {
+                MockStart = () => PromiseHelper.Sleep(100000, 0),
+                MockStop = () => AsyncPool.RunThread(stop.Wait)
+            };
+
+            var p1 = comp.Start();
+            var p2 = comp.Stop();
+            // should enter stopping state
+
+            ShouldThrow(p1.Join);
+            Assert.IsTrue(p1.IsCancelled);
+            Assert.AreEqual(ExecutionState.Stopping, comp.State);
+
+            stop.Set();
+            p2.Join(1000);
+            Assert.AreEqual(ExecutionState.Disposed, comp.State);
+        }
+
+        [TestMethod]
+        public void StartStopFailTest() {
+            var comp = new Runnable(true) {
+                MockStart = () => PromiseHelper.Sleep(100000, 0).Then(null,null,x => { throw new Exception("I'm dead"); })
+            };
+
+            comp.Start();
+            var p = comp.Stop();
+            // if Start fails to cancel, should fail to stop
+            ShouldThrow(() => p.Join(1000));
+            Assert.AreEqual(ExecutionState.Failed, comp.State);
+            Assert.IsNotNull(comp.LastError);
+            Assert.AreEqual("I'm dead", comp.LastError.Message);
+        }
+
+        [TestMethod]
+        public void StopCancelTest() {
+            var comp = new Runnable(true) {
+                MockStop = () => PromiseHelper.Sleep(100000, 0)
+            };
+
+            comp.Start();
+            var p = comp.Stop();
+            Assert.AreEqual(ExecutionState.Stopping, comp.State);
+            p.Cancel();
+            ShouldThrow(() => p.Join(1000));
+            Assert.AreEqual(ExecutionState.Failed, comp.State);
+            Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError);
+
+            comp.Dispose();
+        }
+
+    }
+}
+
--- a/Implab/AbstractEvent.cs	Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/AbstractEvent.cs	Mon Apr 18 16:41:17 2016 +0300
@@ -77,18 +77,19 @@
         /// <param name="error">Исключение возникшее при выполнении операции</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         protected void SetError(Exception error) {
+            while (error is PromiseTransientException)
+                error = error.InnerException;
+
+            var isCancel = error is OperationCanceledException;
+            
             if (BeginTransit()) {
-                if (error is OperationCanceledException) {
-                    m_error = error.InnerException;
-                    CompleteTransit(CANCELLED_STATE);
-                } else {
-                    m_error = error is PromiseTransientException ? error.InnerException : error;
-                    CompleteTransit(REJECTED_STATE);
-                }
+                m_error = isCancel ? error.InnerException : error;
+                CompleteTransit(isCancel ? CANCELLED_STATE : REJECTED_STATE);
+
                 Signal();
             } else {
                 WaitTransition();
-                if (m_state == SUCCEEDED_STATE)
+                if (!isCancel || m_state == SUCCEEDED_STATE)
                     throw new InvalidOperationException("The promise is already resolved");
             }
         }
--- a/Implab/ActionChainTask.cs	Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/ActionChainTask.cs	Mon Apr 18 16:41:17 2016 +0300
@@ -4,6 +4,15 @@
     public class ActionChainTask : ActionChainTaskBase, IDeferred {
         readonly Func<IPromise> m_task;
 
+        /// <summary>
+        /// Initializes a new instance of the <see cref="Implab.ActionChainTask"/> class.
+        /// </summary>
+        /// <param name="task">The operation which will be performed when the <see cref="Resolve()"/> is called.</param>
+        /// <param name="error">The error handler which will invoke when the <see cref="Reject(Exception)"/> is called or when the task fails with an error.</param>
+        /// <param name="cancel">The cancellation handler.</param>
+        /// <param name="autoCancellable">If set to <c>true</c> will automatically accept
+        ///  all cancel requests before the task is started with <see cref="Resolve()"/>,
+        /// after that all requests are directed to the task.</param>
         public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
             m_task = task;
         }
--- a/Implab/Components/RunnableComponent.cs	Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/Components/RunnableComponent.cs	Mon Apr 18 16:41:17 2016 +0300
@@ -1,8 +1,7 @@
 using System;
-using Implab.Formats;
 
 namespace Implab.Components {
-    public class RunnableComponent : Disposable, IRunnable, IInitializable {
+    public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable {
         enum Commands {
             Ok = 0,
             Fail,
@@ -19,8 +18,11 @@
             static StateMachine() {
                 _transitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1];
 
-                Edge(ExecutionState.Created, ExecutionState.Ready, Commands.Ok);
-                Edge(ExecutionState.Created, ExecutionState.Failed, Commands.Fail);
+                Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init);
+                Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose);
+
+                Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok);
+                Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail);
 
                 Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start);
                 Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose);
@@ -36,7 +38,8 @@
 
                 Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail);
                 Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok);
-                Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Dispose);
+
+                Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose);
             }
 
             static void Edge(ExecutionState s1, ExecutionState s2, Commands cmd) {
@@ -70,72 +73,93 @@
             m_stateMachine = new StateMachine(initialized ? ExecutionState.Ready : ExecutionState.Created);
         }
 
+        protected virtual int DisposeTimeout {
+            get {
+                return 10000;
+            }
+        }
+
         void ThrowInvalidCommand(Commands cmd) {
+            if (m_stateMachine.State == ExecutionState.Disposed)
+                throw new ObjectDisposedException(ToString());
+            
             throw new InvalidOperationException(String.Format("Commnd {0} is not allowed in the state {1}", cmd, m_stateMachine.State));
         }
 
-        protected void Move(Commands cmd) {
-            lock (m_stateMachine)
-                if (!m_stateMachine.Move(cmd))
-                    ThrowInvalidCommand(cmd);
+        void Move(Commands cmd) {
+            if (!m_stateMachine.Move(cmd))
+                ThrowInvalidCommand(cmd);
         }
 
-        protected void Fail(Exception err) {
-            lock (m_stateMachine) {
-                if (!m_stateMachine.Move(Commands.Fail))
-                    ThrowInvalidCommand(Commands.Fail);
-
-                m_lastError = err;
-            }
-        }
-
-        protected void Success() {
-            Move(Commands.Ok);
-        }
-
-        protected void Invoke(Commands cmd, Action action) {
-            Move(cmd);
+        void Invoke(Commands cmd, Action action) {
+            lock (m_stateMachine) 
+                Move(cmd);
+            
             try {
                 action();
-                Move(Commands.Ok);
+                lock(m_stateMachine)
+                    Move(Commands.Ok);
+
             } catch (Exception err) {
-                Fail(err);
+                lock (m_stateMachine) {
+                    Move(Commands.Fail);
+                    m_lastError = err;
+                }
                 throw;
             }
         }
 
-        protected IPromise InvokeAsync(Commands cmd, Func<IPromise> action) {
-            Move(cmd);
-            var medium = new Promise();
+        IPromise InvokeAsync(Commands cmd, Func<IPromise> action, Action<IPromise, IDeferred> chain) {
+            IPromise promise = null;
+            IPromise prev;
 
-            IPromise promise = null;
+            var task = new ActionChainTask(action, null, null, true);
+
+            lock (m_stateMachine) {
+                Move(cmd);
+            
+                prev = m_pending;
 
-            promise = medium.Then(
-                () => {
-                    lock(m_stateMachine) {
-                        if (m_pending == promise) {
-                            m_pending = null;
-                            Move(Commands.Ok);
+                promise = task.Then(
+                    () => {
+                        lock(m_stateMachine) {
+                            if (m_pending == promise) {
+                                Move(Commands.Ok);
+                                m_pending = null;
+                            }
                         }
+                    }, e => {
+                        lock(m_stateMachine) {
+                            if (m_pending == promise) {
+                                Move(Commands.Fail);
+                                m_pending = null;
+                                m_lastError = e;
+                            }
+                        }
+                        throw new PromiseTransientException(e);
+                    },
+                    r => {
+                        lock(m_stateMachine) {
+                            if (m_pending == promise) {
+                                Move(Commands.Fail);
+                                m_pending = null;
+                                m_lastError = new OperationCanceledException("The operation has been cancelled", r);
+                            }
+
+                        }
+                        throw new OperationCanceledException("The operation has been cancelled", r);
                     }
-                }, e => {
-                    if (m_pending == promise) {
-                        m_pending = null;
-                        Fail(
-                    }
-                }
-            );
+                );
 
+                m_pending = promise;
+            }
 
+            if (prev == null)
+                task.Resolve();
+            else
+                chain(prev, task);
 
-            return Safe.InvokePromise(action).Then(
-                Success,
-                Fail
-            );
-        }
-
-        void AddPending(IPromise result) {
-
+            return promise;
         }
 
 
@@ -153,43 +177,86 @@
         #region IRunnable implementation
 
         public IPromise Start() {
-            Move(Commands.Start);
-
-            return Safe.InvokePromise(OnStart).Then(
-                () => {
-                    Move(Commands.Ok);
-                    Run();
-                },
-                () => {
-                    Move(Commands.Fail);
-                }
-            );
+            return InvokeAsync(Commands.Start, OnStart, null);
         }
 
         protected virtual IPromise OnStart() {
             return Promise.SUCCESS;
         }
 
-        protected virtual void Run() {
+        public IPromise Stop() {
+            return InvokeAsync(Commands.Stop, OnStop, StopPending).Then(Dispose);
+        }
+
+        protected virtual IPromise OnStop() {
+            return Promise.SUCCESS;
         }
 
-        public IPromise Stop() {
-            throw new NotImplementedException();
+        /// <summary>
+        /// Stops the current operation if one exists.
+        /// </summary>
+        /// <param name="current">Current.</param>
+        /// <param name="stop">Stop.</param>
+        protected virtual void StopPending(IPromise current, IDeferred stop) {
+            if (current == null) {
+                stop.Resolve();
+            } else {
+                current.On(stop.Resolve, stop.Reject, stop.CancelOperation);
+                current.Cancel();
+            }
         }
 
         public ExecutionState State {
             get {
-                throw new NotImplementedException();
+                return m_stateMachine.State;
             }
         }
 
         public Exception LastError {
             get {
-                throw new NotImplementedException();
+                return m_lastError;
             }
         }
 
         #endregion
+
+        #region IDisposable implementation
+
+        public void Dispose() {
+            IPromise pending;
+            lock (m_stateMachine) {
+                if (m_stateMachine.State == ExecutionState.Disposed)
+                    return;
+
+                Move(Commands.Dispose);
+
+                GC.SuppressFinalize(this);
+
+                pending = m_pending;
+                m_pending = null;
+            }
+            if (pending != null) {
+                pending.Cancel();
+                pending.Timeout(DisposeTimeout).On(
+                    () => Dispose(true, null),
+                    err => Dispose(true, err),
+                    reason => Dispose(true, new OperationCanceledException("The operation is cancelled", reason))
+                );
+            } else {
+                Dispose(true, m_lastError);
+            }
+        }
+
+        ~RunnableComponent() {
+            Dispose(false, null);
+        }
+
+        #endregion
+
+        protected virtual void Dispose(bool disposing, Exception lastError) {
+
+        }
+
     }
 }
 
--- a/Implab/PromiseExtensions.cs	Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/PromiseExtensions.cs	Mon Apr 18 16:41:17 2016 +0300
@@ -3,11 +3,6 @@
 using Implab.Diagnostics;
 using System.Collections.Generic;
 
-
-#if NET_4_5
-using System.Threading.Tasks;
-#endif
-
 namespace Implab {
     public static class PromiseExtensions {
         public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
@@ -17,12 +12,12 @@
                 return that;
 
             var p = new SyncContextPromise<T>(context);
-            p.On(that.Cancel, PromiseEventType.Cancelled);
+            p.CancellationRequested(that.Cancel);
 
             that.On(
                 p.Resolve,
                 p.Reject,
-                p.Cancel
+                p.CancelOperation
             );
             return p;
         }
@@ -32,13 +27,12 @@
             Safe.ArgumentNotNull(context, "context");
 
             var p = new SyncContextPromise<T>(context);
-            p.On(that.Cancel, PromiseEventType.Cancelled);
-
+            p.CancellationRequested(that.Cancel);
 
             that.On(
                 p.Resolve,
                 p.Reject,
-                p.Cancel
+                p.CancelOperation
             );
             return p;
         }
@@ -77,8 +71,8 @@
             };
         }
 
-        static void CancelCallback(object cookie) {
-            ((ICancellable)cookie).Cancel();
+        static void CancelByTimeoutCallback(object cookie) {
+            ((ICancellable)cookie).Cancel(new TimeoutException());
         }
 
         /// <summary>
@@ -89,7 +83,7 @@
         /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
         public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
             Safe.ArgumentNotNull(that, "that");
-            var timer = new Timer(CancelCallback, that, milliseconds, -1);
+            var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1);
             that.On(timer.Dispose, PromiseEventType.All);
             return that;
         }
@@ -180,8 +174,7 @@
 
             var d = new ActionTask(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
-            if (success != null)
-                d.CancellationRequested(that.Cancel);
+            d.CancellationRequested(that.Cancel);
             return d;
         }
 
@@ -198,8 +191,7 @@
 
             var d = new FuncTask<T>(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
-            if (success != null)
-                d.CancellationRequested(that.Cancel);
+            d.CancellationRequested(that.Cancel);
             return d;
         }
 
@@ -215,8 +207,7 @@
             Safe.ArgumentNotNull(that, "that");
             var d = new FuncTask<T,T2>(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
-            if (success != null)
-                d.CancellationRequested(that.Cancel);
+            d.CancellationRequested(that.Cancel);
             return d;
         }
 
@@ -234,8 +225,7 @@
 
             var d = new ActionChainTask(success, error, cancel, false);
             that.On(d.Resolve, d.Reject, d.CancelOperation);
-            if (success != null)
-                d.CancellationRequested(that.Cancel);
+            d.CancellationRequested(that.Cancel);
             return d;
         }