# HG changeset patch
# User cin
# Date 1460986877 -10800
# Node ID 822aab37b107951b00cb44f7661c09c5c1b30320
# Parent d6a8cba73acc6ab2dff5e87878e29faa9b3b5cdf
runnable component, work in progress
diff -r d6a8cba73acc -r 822aab37b107 Implab.Test/Implab.Test.mono.csproj
--- a/Implab.Test/Implab.Test.mono.csproj Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab.Test/Implab.Test.mono.csproj Mon Apr 18 16:41:17 2016 +0300
@@ -58,6 +58,7 @@
+
diff -r d6a8cba73acc -r 822aab37b107 Implab.Test/RunnableComponentTests.cs
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab.Test/RunnableComponentTests.cs Mon Apr 18 16:41:17 2016 +0300
@@ -0,0 +1,194 @@
+using System;
+using System.Reflection;
+using System.Threading;
+using Implab.Parallels;
+using Implab.Components;
+
+#if MONO
+
+using NUnit.Framework;
+using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
+using TestMethodAttribute = NUnit.Framework.TestAttribute;
+
+#else
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+#endif
+
+namespace Implab.Test {
+ [TestClass]
+ public class RunnableComponentTests {
+
+ static void ShouldThrow(Action action) {
+ try {
+ action();
+ Assert.Fail();
+ } catch(AssertionException) {
+ throw;
+ } catch {
+ }
+ }
+
+ class Runnable : RunnableComponent {
+ public Runnable(bool initialized) : base(initialized) {
+ }
+
+ public Action MockInit {
+ get;
+ set;
+ }
+
+ public Func MockStart {
+ get;
+ set;
+ }
+
+ public Func MockStop {
+ get;
+ set;
+ }
+
+ protected override IPromise OnStart() {
+ return MockStart != null ? MockStart() : base.OnStart();
+ }
+
+ protected override IPromise OnStop() {
+ return MockStop != null ? MockStop() : base.OnStart();
+ }
+
+ protected override void OnInitialize() {
+ if (MockInit != null)
+ MockInit();
+ }
+ }
+
+ [TestMethod]
+ public void NormalFlowTest() {
+ var comp = new Runnable(false);
+
+ Assert.AreEqual(ExecutionState.Created, comp.State);
+
+ comp.Init();
+
+ Assert.AreEqual(ExecutionState.Ready, comp.State);
+
+ comp.Start().Join(1000);
+
+ Assert.AreEqual(ExecutionState.Running, comp.State);
+
+ comp.Stop().Join(1000);
+
+ Assert.AreEqual(ExecutionState.Disposed, comp.State);
+
+ }
+
+ [TestMethod]
+ public void InitFailTest() {
+ var comp = new Runnable(false) {
+ MockInit = () => {
+ throw new Exception("BAD");
+ }
+ };
+
+ ShouldThrow(() => comp.Start());
+ ShouldThrow(() => comp.Stop());
+ Assert.AreEqual(ExecutionState.Created, comp.State);
+
+ ShouldThrow(comp.Init);
+
+ Assert.AreEqual(ExecutionState.Failed, comp.State);
+
+ ShouldThrow(() => comp.Start());
+ ShouldThrow(() => comp.Stop());
+ Assert.AreEqual(ExecutionState.Failed, comp.State);
+
+ comp.Dispose();
+ Assert.AreEqual(ExecutionState.Disposed, comp.State);
+ }
+
+ [TestMethod]
+ public void DisposedTest() {
+
+ var comp = new Runnable(false);
+ comp.Dispose();
+
+ ShouldThrow(() => comp.Start());
+ ShouldThrow(() => comp.Stop());
+ ShouldThrow(comp.Init);
+
+ Assert.AreEqual(ExecutionState.Disposed, comp.State);
+ }
+
+ [TestMethod]
+ public void StartCancelTest() {
+ var comp = new Runnable(true) {
+ MockStart = () => PromiseHelper.Sleep(100000, 0)
+ };
+
+ var p = comp.Start();
+ Assert.AreEqual(ExecutionState.Starting, comp.State);
+ p.Cancel();
+ ShouldThrow(() => p.Join(1000));
+ Assert.AreEqual(ExecutionState.Failed, comp.State);
+ Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError);
+
+ comp.Dispose();
+ }
+
+ [TestMethod]
+ public void StartStopTest() {
+ var stop = new Signal();
+ var comp = new Runnable(true) {
+ MockStart = () => PromiseHelper.Sleep(100000, 0),
+ MockStop = () => AsyncPool.RunThread(stop.Wait)
+ };
+
+ var p1 = comp.Start();
+ var p2 = comp.Stop();
+ // should enter stopping state
+
+ ShouldThrow(p1.Join);
+ Assert.IsTrue(p1.IsCancelled);
+ Assert.AreEqual(ExecutionState.Stopping, comp.State);
+
+ stop.Set();
+ p2.Join(1000);
+ Assert.AreEqual(ExecutionState.Disposed, comp.State);
+ }
+
+ [TestMethod]
+ public void StartStopFailTest() {
+ var comp = new Runnable(true) {
+ MockStart = () => PromiseHelper.Sleep(100000, 0).Then(null,null,x => { throw new Exception("I'm dead"); })
+ };
+
+ comp.Start();
+ var p = comp.Stop();
+ // if Start fails to cancel, should fail to stop
+ ShouldThrow(() => p.Join(1000));
+ Assert.AreEqual(ExecutionState.Failed, comp.State);
+ Assert.IsNotNull(comp.LastError);
+ Assert.AreEqual("I'm dead", comp.LastError.Message);
+ }
+
+ [TestMethod]
+ public void StopCancelTest() {
+ var comp = new Runnable(true) {
+ MockStop = () => PromiseHelper.Sleep(100000, 0)
+ };
+
+ comp.Start();
+ var p = comp.Stop();
+ Assert.AreEqual(ExecutionState.Stopping, comp.State);
+ p.Cancel();
+ ShouldThrow(() => p.Join(1000));
+ Assert.AreEqual(ExecutionState.Failed, comp.State);
+ Assert.IsInstanceOfType(typeof(OperationCanceledException), comp.LastError);
+
+ comp.Dispose();
+ }
+
+ }
+}
+
diff -r d6a8cba73acc -r 822aab37b107 Implab/AbstractEvent.cs
--- a/Implab/AbstractEvent.cs Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/AbstractEvent.cs Mon Apr 18 16:41:17 2016 +0300
@@ -77,18 +77,19 @@
/// Исключение возникшее при выполнении операции
/// Данное обещание уже выполнено
protected void SetError(Exception error) {
+ while (error is PromiseTransientException)
+ error = error.InnerException;
+
+ var isCancel = error is OperationCanceledException;
+
if (BeginTransit()) {
- if (error is OperationCanceledException) {
- m_error = error.InnerException;
- CompleteTransit(CANCELLED_STATE);
- } else {
- m_error = error is PromiseTransientException ? error.InnerException : error;
- CompleteTransit(REJECTED_STATE);
- }
+ m_error = isCancel ? error.InnerException : error;
+ CompleteTransit(isCancel ? CANCELLED_STATE : REJECTED_STATE);
+
Signal();
} else {
WaitTransition();
- if (m_state == SUCCEEDED_STATE)
+ if (!isCancel || m_state == SUCCEEDED_STATE)
throw new InvalidOperationException("The promise is already resolved");
}
}
diff -r d6a8cba73acc -r 822aab37b107 Implab/ActionChainTask.cs
--- a/Implab/ActionChainTask.cs Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/ActionChainTask.cs Mon Apr 18 16:41:17 2016 +0300
@@ -4,6 +4,15 @@
public class ActionChainTask : ActionChainTaskBase, IDeferred {
readonly Func m_task;
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The operation which will be performed when the is called.
+ /// The error handler which will invoke when the is called or when the task fails with an error.
+ /// The cancellation handler.
+ /// If set to true will automatically accept
+ /// all cancel requests before the task is started with ,
+ /// after that all requests are directed to the task.
public ActionChainTask(Func task, Func error, Func cancel, bool autoCancellable) : base(error,cancel, autoCancellable) {
m_task = task;
}
diff -r d6a8cba73acc -r 822aab37b107 Implab/Components/RunnableComponent.cs
--- a/Implab/Components/RunnableComponent.cs Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/Components/RunnableComponent.cs Mon Apr 18 16:41:17 2016 +0300
@@ -1,8 +1,7 @@
using System;
-using Implab.Formats;
namespace Implab.Components {
- public class RunnableComponent : Disposable, IRunnable, IInitializable {
+ public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable {
enum Commands {
Ok = 0,
Fail,
@@ -19,8 +18,11 @@
static StateMachine() {
_transitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1];
- Edge(ExecutionState.Created, ExecutionState.Ready, Commands.Ok);
- Edge(ExecutionState.Created, ExecutionState.Failed, Commands.Fail);
+ Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init);
+ Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose);
+
+ Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok);
+ Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail);
Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start);
Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose);
@@ -36,7 +38,8 @@
Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail);
Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok);
- Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Dispose);
+
+ Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose);
}
static void Edge(ExecutionState s1, ExecutionState s2, Commands cmd) {
@@ -70,72 +73,93 @@
m_stateMachine = new StateMachine(initialized ? ExecutionState.Ready : ExecutionState.Created);
}
+ protected virtual int DisposeTimeout {
+ get {
+ return 10000;
+ }
+ }
+
void ThrowInvalidCommand(Commands cmd) {
+ if (m_stateMachine.State == ExecutionState.Disposed)
+ throw new ObjectDisposedException(ToString());
+
throw new InvalidOperationException(String.Format("Commnd {0} is not allowed in the state {1}", cmd, m_stateMachine.State));
}
- protected void Move(Commands cmd) {
- lock (m_stateMachine)
- if (!m_stateMachine.Move(cmd))
- ThrowInvalidCommand(cmd);
+ void Move(Commands cmd) {
+ if (!m_stateMachine.Move(cmd))
+ ThrowInvalidCommand(cmd);
}
- protected void Fail(Exception err) {
- lock (m_stateMachine) {
- if (!m_stateMachine.Move(Commands.Fail))
- ThrowInvalidCommand(Commands.Fail);
-
- m_lastError = err;
- }
- }
-
- protected void Success() {
- Move(Commands.Ok);
- }
-
- protected void Invoke(Commands cmd, Action action) {
- Move(cmd);
+ void Invoke(Commands cmd, Action action) {
+ lock (m_stateMachine)
+ Move(cmd);
+
try {
action();
- Move(Commands.Ok);
+ lock(m_stateMachine)
+ Move(Commands.Ok);
+
} catch (Exception err) {
- Fail(err);
+ lock (m_stateMachine) {
+ Move(Commands.Fail);
+ m_lastError = err;
+ }
throw;
}
}
- protected IPromise InvokeAsync(Commands cmd, Func action) {
- Move(cmd);
- var medium = new Promise();
+ IPromise InvokeAsync(Commands cmd, Func action, Action chain) {
+ IPromise promise = null;
+ IPromise prev;
- IPromise promise = null;
+ var task = new ActionChainTask(action, null, null, true);
+
+ lock (m_stateMachine) {
+ Move(cmd);
+
+ prev = m_pending;
- promise = medium.Then(
- () => {
- lock(m_stateMachine) {
- if (m_pending == promise) {
- m_pending = null;
- Move(Commands.Ok);
+ promise = task.Then(
+ () => {
+ lock(m_stateMachine) {
+ if (m_pending == promise) {
+ Move(Commands.Ok);
+ m_pending = null;
+ }
}
+ }, e => {
+ lock(m_stateMachine) {
+ if (m_pending == promise) {
+ Move(Commands.Fail);
+ m_pending = null;
+ m_lastError = e;
+ }
+ }
+ throw new PromiseTransientException(e);
+ },
+ r => {
+ lock(m_stateMachine) {
+ if (m_pending == promise) {
+ Move(Commands.Fail);
+ m_pending = null;
+ m_lastError = new OperationCanceledException("The operation has been cancelled", r);
+ }
+
+ }
+ throw new OperationCanceledException("The operation has been cancelled", r);
}
- }, e => {
- if (m_pending == promise) {
- m_pending = null;
- Fail(
- }
- }
- );
+ );
+ m_pending = promise;
+ }
+ if (prev == null)
+ task.Resolve();
+ else
+ chain(prev, task);
- return Safe.InvokePromise(action).Then(
- Success,
- Fail
- );
- }
-
- void AddPending(IPromise result) {
-
+ return promise;
}
@@ -153,43 +177,86 @@
#region IRunnable implementation
public IPromise Start() {
- Move(Commands.Start);
-
- return Safe.InvokePromise(OnStart).Then(
- () => {
- Move(Commands.Ok);
- Run();
- },
- () => {
- Move(Commands.Fail);
- }
- );
+ return InvokeAsync(Commands.Start, OnStart, null);
}
protected virtual IPromise OnStart() {
return Promise.SUCCESS;
}
- protected virtual void Run() {
+ public IPromise Stop() {
+ return InvokeAsync(Commands.Stop, OnStop, StopPending).Then(Dispose);
+ }
+
+ protected virtual IPromise OnStop() {
+ return Promise.SUCCESS;
}
- public IPromise Stop() {
- throw new NotImplementedException();
+ ///
+ /// Stops the current operation if one exists.
+ ///
+ /// Current.
+ /// Stop.
+ protected virtual void StopPending(IPromise current, IDeferred stop) {
+ if (current == null) {
+ stop.Resolve();
+ } else {
+ current.On(stop.Resolve, stop.Reject, stop.CancelOperation);
+ current.Cancel();
+ }
}
public ExecutionState State {
get {
- throw new NotImplementedException();
+ return m_stateMachine.State;
}
}
public Exception LastError {
get {
- throw new NotImplementedException();
+ return m_lastError;
}
}
#endregion
+
+ #region IDisposable implementation
+
+ public void Dispose() {
+ IPromise pending;
+ lock (m_stateMachine) {
+ if (m_stateMachine.State == ExecutionState.Disposed)
+ return;
+
+ Move(Commands.Dispose);
+
+ GC.SuppressFinalize(this);
+
+ pending = m_pending;
+ m_pending = null;
+ }
+ if (pending != null) {
+ pending.Cancel();
+ pending.Timeout(DisposeTimeout).On(
+ () => Dispose(true, null),
+ err => Dispose(true, err),
+ reason => Dispose(true, new OperationCanceledException("The operation is cancelled", reason))
+ );
+ } else {
+ Dispose(true, m_lastError);
+ }
+ }
+
+ ~RunnableComponent() {
+ Dispose(false, null);
+ }
+
+ #endregion
+
+ protected virtual void Dispose(bool disposing, Exception lastError) {
+
+ }
+
}
}
diff -r d6a8cba73acc -r 822aab37b107 Implab/PromiseExtensions.cs
--- a/Implab/PromiseExtensions.cs Sat Apr 16 03:23:26 2016 +0300
+++ b/Implab/PromiseExtensions.cs Mon Apr 18 16:41:17 2016 +0300
@@ -3,11 +3,6 @@
using Implab.Diagnostics;
using System.Collections.Generic;
-
-#if NET_4_5
-using System.Threading.Tasks;
-#endif
-
namespace Implab {
public static class PromiseExtensions {
public static IPromise DispatchToCurrentContext(this IPromise that) {
@@ -17,12 +12,12 @@
return that;
var p = new SyncContextPromise(context);
- p.On(that.Cancel, PromiseEventType.Cancelled);
+ p.CancellationRequested(that.Cancel);
that.On(
p.Resolve,
p.Reject,
- p.Cancel
+ p.CancelOperation
);
return p;
}
@@ -32,13 +27,12 @@
Safe.ArgumentNotNull(context, "context");
var p = new SyncContextPromise(context);
- p.On(that.Cancel, PromiseEventType.Cancelled);
-
+ p.CancellationRequested(that.Cancel);
that.On(
p.Resolve,
p.Reject,
- p.Cancel
+ p.CancelOperation
);
return p;
}
@@ -77,8 +71,8 @@
};
}
- static void CancelCallback(object cookie) {
- ((ICancellable)cookie).Cancel();
+ static void CancelByTimeoutCallback(object cookie) {
+ ((ICancellable)cookie).Cancel(new TimeoutException());
}
///
@@ -89,7 +83,7 @@
/// The 1st type parameter.
public static TPromise Timeout(this TPromise that, int milliseconds) where TPromise : IPromise {
Safe.ArgumentNotNull(that, "that");
- var timer = new Timer(CancelCallback, that, milliseconds, -1);
+ var timer = new Timer(CancelByTimeoutCallback, that, milliseconds, -1);
that.On(timer.Dispose, PromiseEventType.All);
return that;
}
@@ -180,8 +174,7 @@
var d = new ActionTask(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
- if (success != null)
- d.CancellationRequested(that.Cancel);
+ d.CancellationRequested(that.Cancel);
return d;
}
@@ -198,8 +191,7 @@
var d = new FuncTask(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
- if (success != null)
- d.CancellationRequested(that.Cancel);
+ d.CancellationRequested(that.Cancel);
return d;
}
@@ -215,8 +207,7 @@
Safe.ArgumentNotNull(that, "that");
var d = new FuncTask(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
- if (success != null)
- d.CancellationRequested(that.Cancel);
+ d.CancellationRequested(that.Cancel);
return d;
}
@@ -234,8 +225,7 @@
var d = new ActionChainTask(success, error, cancel, false);
that.On(d.Resolve, d.Reject, d.CancelOperation);
- if (success != null)
- d.CancellationRequested(that.Cancel);
+ d.CancellationRequested(that.Cancel);
return d;
}