Mercurial > pub > ImplabNet
changeset 185:822aab37b107 ref20160224
runnable component, work in progress
author | cin |
---|---|
date | Mon, 18 Apr 2016 16:41:17 +0300 |
parents | d6a8cba73acc |
children | 75103928da09 |
files | Implab.Test/Implab.Test.mono.csproj Implab.Test/RunnableComponentTests.cs Implab/AbstractEvent.cs Implab/ActionChainTask.cs Implab/Components/RunnableComponent.cs Implab/PromiseExtensions.cs |
diffstat | 6 files changed, 358 insertions(+), 96 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/Implab.Test.mono.csproj Sat Apr 16 03:23:26 2016 +0300 +++ b/Implab.Test/Implab.Test.mono.csproj Mon Apr 18 16:41:17 2016 +0300 @@ -58,6 +58,7 @@ <Compile Include="PromiseHelper.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="CancelationTests.cs" /> + <Compile Include="RunnableComponentTests.cs" /> </ItemGroup> <ItemGroup> <ProjectReference Include="..\Implab\Implab.csproj">
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab.Test/RunnableComponentTests.cs Mon Apr 18 16:41:17 2016 +0300 @@ -0,0 +1,194 @@ +using System; +using System.Reflection; +using System.Threading; +using Implab.Parallels; +using Implab.Components; + +#if MONO + +using NUnit.Framework; +using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; +using TestMethodAttribute = NUnit.Framework.TestAttribute; + +#else + +using Microsoft.VisualStudio.TestTools.UnitTesting; + +#endif + +namespace Implab.Test { + [TestClass] + public class RunnableComponentTests { + + static void ShouldThrow(Action action) { + try { + action(); + Assert.Fail(); + } catch(AssertionException) { + throw; + } catch { + } + } + + class Runnable : RunnableComponent { + public Runnable(bool initialized) : base(initialized) { + } + + public Action MockInit { + get; + set; + } + + public Func<IPromise> MockStart { + get; + set; + } + + public Func<IPromise> MockStop { + get; + set; + } + + protected override IPromise OnStart() { + return MockStart != null ? MockStart() : base.OnStart(); + } + + protected override IPromise OnStop() { + return MockStop != null ? MockStop() : base.OnStart(); + } + + protected override void OnInitialize() { + if (MockInit != null) + MockInit(); + } + } + + [TestMethod] + public void NormalFlowTest() { + var comp = new Runnable(false); + + Assert.AreEqual(ExecutionState.Created, comp.State); + + comp.Init(); + + Assert.AreEqual(ExecutionState.Ready, comp.State); + + comp.Start().Join(1000); + + Assert.AreEqual(ExecutionState.Running, comp.State); + + comp.Stop().Join(1000); + + Assert.AreEqual(ExecutionState.Disposed, comp.State); + + } + + [TestMethod] + public void InitFailTest() { + var comp = new Runnable(false) { + MockInit = () => { + throw new Exception("BAD"); + } + }; + + ShouldThrow(() => comp.Start()); + ShouldThrow(() => comp.Stop()); + Assert.AreEqual(ExecutionState.Created, comp.State); + + ShouldThrow(comp.Init); + + Assert.AreEqual(ExecutionState.Failed, comp.State); + + ShouldThrow(() => comp.Start()); + ShouldThrow(() => comp.Stop()); + Assert.AreEqual(ExecutionState.Failed, comp.State); + + comp.Dispose(); + Assert.AreEqual(ExecutionState.Disposed, comp.State); + } + + [TestMethod] + public void DisposedTest() { + + var comp = new Runnable(false); + comp.Dispose(); + + ShouldThrow(() => comp.Start()); + ShouldThrow(() => comp.Stop()); + ShouldThrow(comp.Init); + + Assert.AreEqual(ExecutionState.Disposed, comp.State); + } + + [TestMethod] + public void StartCancelTest() { + var comp = new Runnable(true) { + MockStart = () => PromiseHelper.Sleep(100000, 0) + }; + + var p = comp.Start(); + Assert.AreEqual(ExecutionState.Starting, comp.State); + p.Cancel(); + ShouldThrow(() => p.Join(1000)); + Assert.AreEqual(ExecutionState.Failed, comp.State); + Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError); + + comp.Dispose(); + } + + [TestMethod] + public void StartStopTest() { + var stop = new Signal(); + var comp = new Runnable(true) { + MockStart = () => PromiseHelper.Sleep(100000, 0), + MockStop = () => AsyncPool.RunThread(stop.Wait) + }; + + var p1 = comp.Start(); + var p2 = comp.Stop(); + // should enter stopping state + + ShouldThrow(p1.Join); + Assert.IsTrue(p1.IsCancelled); + Assert.AreEqual(ExecutionState.Stopping, comp.State); + + stop.Set(); + p2.Join(1000); + Assert.AreEqual(ExecutionState.Disposed, comp.State); + } + + [TestMethod] + public void StartStopFailTest() { + var comp = new Runnable(true) { + MockStart = () => PromiseHelper.Sleep(100000, 0).Then(null,null,x => { throw new Exception("I'm dead"); }) + }; + + comp.Start(); + var p = comp.Stop(); + // if Start fails to cancel, should fail to stop + ShouldThrow(() => p.Join(1000)); + Assert.AreEqual(ExecutionState.Failed, comp.State); + Assert.IsNotNull(comp.LastError); + Assert.AreEqual("I'm dead", comp.LastError.Message); + } + + [TestMethod] + public void StopCancelTest() { + var comp = new Runnable(true) { + MockStop = () => PromiseHelper.Sleep(100000, 0) + }; + + comp.Start(); + var p = comp.Stop(); + Assert.AreEqual(ExecutionState.Stopping, comp.State); + p.Cancel(); + ShouldThrow(() => p.Join(1000)); + Assert.AreEqual(ExecutionState.Failed, comp.State); + Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError); + + comp.Dispose(); + } + + } +} +
--- a/Implab/AbstractEvent.cs Sat Apr 16 03:23:26 2016 +0300 +++ b/Implab/AbstractEvent.cs Mon Apr 18 16:41:17 2016 +0300 @@ -77,18 +77,19 @@ /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> protected void SetError(Exception error) { + while (error is PromiseTransientException) + error = error.InnerException; + + var isCancel = error is OperationCanceledException; + if (BeginTransit()) { - if (error is OperationCanceledException) { - m_error = error.InnerException; - CompleteTransit(CANCELLED_STATE); - } else { - m_error = error is PromiseTransientException ? error.InnerException : error; - CompleteTransit(REJECTED_STATE); - } + m_error = isCancel ? error.InnerException : error; + CompleteTransit(isCancel ? CANCELLED_STATE : REJECTED_STATE); + Signal(); } else { WaitTransition(); - if (m_state == SUCCEEDED_STATE) + if (!isCancel || m_state == SUCCEEDED_STATE) throw new InvalidOperationException("The promise is already resolved"); } }
--- a/Implab/ActionChainTask.cs Sat Apr 16 03:23:26 2016 +0300 +++ b/Implab/ActionChainTask.cs Mon Apr 18 16:41:17 2016 +0300 @@ -4,6 +4,15 @@ public class ActionChainTask : ActionChainTaskBase, IDeferred { readonly Func<IPromise> m_task; + /// <summary> + /// Initializes a new instance of the <see cref="Implab.ActionChainTask"/> class. + /// </summary> + /// <param name="task">The operation which will be performed when the <see cref="Resolve()"/> is called.</param> + /// <param name="error">The error handler which will invoke when the <see cref="Reject(Exception)"/> is called or when the task fails with an error.</param> + /// <param name="cancel">The cancellation handler.</param> + /// <param name="autoCancellable">If set to <c>true</c> will automatically accept + /// all cancel requests before the task is started with <see cref="Resolve()"/>, + /// after that all requests are directed to the task.</param> public ActionChainTask(Func<IPromise> task, Func<Exception, IPromise> error, Func<Exception, IPromise> cancel, bool autoCancellable) : base(error,cancel, autoCancellable) { m_task = task; }
--- a/Implab/Components/RunnableComponent.cs Sat Apr 16 03:23:26 2016 +0300 +++ b/Implab/Components/RunnableComponent.cs Mon Apr 18 16:41:17 2016 +0300 @@ -1,8 +1,7 @@ using System; -using Implab.Formats; namespace Implab.Components { - public class RunnableComponent : Disposable, IRunnable, IInitializable { + public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable { enum Commands { Ok = 0, Fail, @@ -19,8 +18,11 @@ static StateMachine() { _transitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1]; - Edge(ExecutionState.Created, ExecutionState.Ready, Commands.Ok); - Edge(ExecutionState.Created, ExecutionState.Failed, Commands.Fail); + Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init); + Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose); + + Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok); + Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail); Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start); Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose); @@ -36,7 +38,8 @@ Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail); Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok); - Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Dispose); + + Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose); } static void Edge(ExecutionState s1, ExecutionState s2, Commands cmd) { @@ -70,72 +73,93 @@ m_stateMachine = new StateMachine(initialized ? ExecutionState.Ready : ExecutionState.Created); } + protected virtual int DisposeTimeout { + get { + return 10000; + } + } + void ThrowInvalidCommand(Commands cmd) { + if (m_stateMachine.State == ExecutionState.Disposed) + throw new ObjectDisposedException(ToString()); + throw new InvalidOperationException(String.Format("Commnd {0} is not allowed in the state {1}", cmd, m_stateMachine.State)); } - protected void Move(Commands cmd) { - lock (m_stateMachine) - if (!m_stateMachine.Move(cmd)) - ThrowInvalidCommand(cmd); + void Move(Commands cmd) { + if (!m_stateMachine.Move(cmd)) + ThrowInvalidCommand(cmd); } - protected void Fail(Exception err) { - lock (m_stateMachine) { - if (!m_stateMachine.Move(Commands.Fail)) - ThrowInvalidCommand(Commands.Fail); - - m_lastError = err; - } - } - - protected void Success() { - Move(Commands.Ok); - } - - protected void Invoke(Commands cmd, Action action) { - Move(cmd); + void Invoke(Commands cmd, Action action) { + lock (m_stateMachine) + Move(cmd); + try { action(); - Move(Commands.Ok); + lock(m_stateMachine) + Move(Commands.Ok); + } catch (Exception err) { - Fail(err); + lock (m_stateMachine) { + Move(Commands.Fail); + m_lastError = err; + } throw; } } - protected IPromise InvokeAsync(Commands cmd, Func<IPromise> action) { - Move(cmd); - var medium = new Promise(); + IPromise InvokeAsync(Commands cmd, Func<IPromise> action, Action<IPromise, IDeferred> chain) { + IPromise promise = null; + IPromise prev; - IPromise promise = null; + var task = new ActionChainTask(action, null, null, true); + + lock (m_stateMachine) { + Move(cmd); + + prev = m_pending; - promise = medium.Then( - () => { - lock(m_stateMachine) { - if (m_pending == promise) { - m_pending = null; - Move(Commands.Ok); + promise = task.Then( + () => { + lock(m_stateMachine) { + if (m_pending == promise) { + Move(Commands.Ok); + m_pending = null; + } } + }, e => { + lock(m_stateMachine) { + if (m_pending == promise) { + Move(Commands.Fail); + m_pending = null; + m_lastError = e; + } + } + throw new PromiseTransientException(e); + }, + r => { + lock(m_stateMachine) { + if (m_pending == promise) { + Move(Commands.Fail); + m_pending = null; + m_lastError = new OperationCanceledException("The operation has been cancelled", r); + } + + } + throw new OperationCanceledException("The operation has been cancelled", r); } - }, e => { - if (m_pending == promise) { - m_pending = null; - Fail( - } - } - ); + ); + m_pending = promise; + } + if (prev == null) + task.Resolve(); + else + chain(prev, task); - return Safe.InvokePromise(action).Then( - Success, - Fail - ); - } - - void AddPending(IPromise result) { - + return promise; } @@ -153,43 +177,86 @@ #region IRunnable implementation public IPromise Start() { - Move(Commands.Start); - - return Safe.InvokePromise(OnStart).Then( - () => { - Move(Commands.Ok); - Run(); - }, - () => { - Move(Commands.Fail); - } - ); + return InvokeAsync(Commands.Start, OnStart, null); } protected virtual IPromise OnStart() { return Promise.SUCCESS; } - protected virtual void Run() { + public IPromise Stop() { + return InvokeAsync(Commands.Stop, OnStop, StopPending).Then(Dispose); + } + + protected virtual IPromise OnStop() { + return Promise.SUCCESS; } - public IPromise Stop() { - throw new NotImplementedException(); + /// <summary> + /// Stops the current operation if one exists. + /// </summary> + /// <param name="current">Current.</param> + /// <param name="stop">Stop.</param> + protected virtual void StopPending(IPromise current, IDeferred stop) { + if (current == null) { + stop.Resolve(); + } else { + current.On(stop.Resolve, stop.Reject, stop.CancelOperation); + current.Cancel(); + } } public ExecutionState State { get { - throw new NotImplementedException(); + return m_stateMachine.State; } } public Exception LastError { get { - throw new NotImplementedException(); + return m_lastError; } } #endregion + + #region IDisposable implementation + + public void Dispose() { + IPromise pending; + lock (m_stateMachine) { + if (m_stateMachine.State == ExecutionState.Disposed) + return; + + Move(Commands.Dispose); + + GC.SuppressFinalize(this); + + pending = m_pending; + m_pending = null; + } + if (pending != null) { + pending.Cancel(); + pending.Timeout(DisposeTimeout).On( + () => Dispose(true, null), + err => Dispose(true, err), + reason => Dispose(true, new OperationCanceledException("The operation is cancelled", reason)) + ); + } else { + Dispose(true, m_lastError); + } + } + + ~RunnableComponent() { + Dispose(false, null); + } + + #endregion + + protected virtual void Dispose(bool disposing, Exception lastError) { + + } + } }
--- a/Implab/PromiseExtensions.cs Sat Apr 16 03:23:26 2016 +0300 +++ b/Implab/PromiseExtensions.cs Mon Apr 18 16:41:17 2016 +0300 @@ -3,11 +3,6 @@ using Implab.Diagnostics; using System.Collections.Generic; - -#if NET_4_5 -using System.Threading.Tasks; -#endif - namespace Implab { public static class PromiseExtensions { public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) { @@ -17,12 +12,12 @@ return that; var p = new SyncContextPromise<T>(context); - p.On(that.Cancel, PromiseEventType.Cancelled); + p.CancellationRequested(that.Cancel); that.On( p.Resolve, p.Reject, - p.Cancel + p.CancelOperation ); return p; } @@ -32,13 +27,12 @@ Safe.ArgumentNotNull(context, "context"); var p = new SyncContextPromise<T>(context); - p.On(that.Cancel, PromiseEventType.Cancelled); - + p.CancellationRequested(that.Cancel); that.On( p.Resolve, p.Reject, - p.Cancel + p.CancelOperation ); return p; } @@ -77,8 +71,8 @@ }; } - static void CancelCallback(object cookie) { - ((ICancellable)cookie).Cancel(); + static void CancelByTimeoutCallback(object cookie) { + ((ICancellable)cookie).Cancel(new TimeoutException()); } /// <summary> @@ -89,7 +83,7 @@ /// <typeparam name="TPromise">The 1st type parameter.</typeparam> public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise { Safe.ArgumentNotNull(that, "that"); - var timer = new Timer(CancelCallback, that, milliseconds, -1); + var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1); that.On(timer.Dispose, PromiseEventType.All); return that; } @@ -180,8 +174,7 @@ var d = new ActionTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); + d.CancellationRequested(that.Cancel); return d; } @@ -198,8 +191,7 @@ var d = new FuncTask<T>(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); + d.CancellationRequested(that.Cancel); return d; } @@ -215,8 +207,7 @@ Safe.ArgumentNotNull(that, "that"); var d = new FuncTask<T,T2>(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); + d.CancellationRequested(that.Cancel); return d; } @@ -234,8 +225,7 @@ var d = new ActionChainTask(success, error, cancel, false); that.On(d.Resolve, d.Reject, d.CancelOperation); - if (success != null) - d.CancellationRequested(that.Cancel); + d.CancellationRequested(that.Cancel); return d; }