Mercurial > pub > ImplabNet
view Implab/Components/RunnableComponent.cs @ 203:4d9830a9bbb8 v2
Added 'Fail' method to RunnableComponent which allows component to move from
Running to Failed state.
Added PollingComponent a timer based runnable component
More tests
Added FailPromise a thin class to wrap exceptions
Fixed error handling in SuccessPromise classes.
author | cin |
---|---|
date | Tue, 18 Oct 2016 17:49:54 +0300 |
parents | 2651cb9a4250 |
children | 8200ab154c8a |
line wrap: on
line source
using System; namespace Implab.Components { public abstract class RunnableComponent : IDisposable, IRunnable, IInitializable { enum Commands { Ok = 0, Fail, Init, Start, Stop, Dispose, Last = Dispose } class StateMachine { static readonly ExecutionState[,] _transitions; static StateMachine() { _transitions = new ExecutionState[(int)ExecutionState.Last + 1, (int)Commands.Last + 1]; Edge(ExecutionState.Created, ExecutionState.Initializing, Commands.Init); Edge(ExecutionState.Created, ExecutionState.Disposed, Commands.Dispose); Edge(ExecutionState.Initializing, ExecutionState.Ready, Commands.Ok); Edge(ExecutionState.Initializing, ExecutionState.Failed, Commands.Fail); Edge(ExecutionState.Ready, ExecutionState.Starting, Commands.Start); Edge(ExecutionState.Ready, ExecutionState.Disposed, Commands.Dispose); Edge(ExecutionState.Starting, ExecutionState.Running, Commands.Ok); Edge(ExecutionState.Starting, ExecutionState.Failed, Commands.Fail); Edge(ExecutionState.Starting, ExecutionState.Stopping, Commands.Stop); Edge(ExecutionState.Starting, ExecutionState.Disposed, Commands.Dispose); Edge(ExecutionState.Running, ExecutionState.Failed, Commands.Fail); Edge(ExecutionState.Running, ExecutionState.Stopping, Commands.Stop); Edge(ExecutionState.Running, ExecutionState.Disposed, Commands.Dispose); Edge(ExecutionState.Stopping, ExecutionState.Failed, Commands.Fail); Edge(ExecutionState.Stopping, ExecutionState.Disposed, Commands.Ok); Edge(ExecutionState.Failed, ExecutionState.Disposed, Commands.Dispose); } static void Edge(ExecutionState s1, ExecutionState s2, Commands cmd) { _transitions[(int)s1, (int)cmd] = s2; } public ExecutionState State { get; private set; } public StateMachine(ExecutionState initial) { State = initial; } public bool Move(Commands cmd) { var next = _transitions[(int)State, (int)cmd]; if (next == ExecutionState.Undefined) return false; State = next; return true; } } IPromise m_pending; Exception m_lastError; readonly StateMachine m_stateMachine; protected RunnableComponent(bool initialized) { m_stateMachine = new StateMachine(initialized ? ExecutionState.Ready : ExecutionState.Created); DisposeTimeout = 10000; } /// <summary> /// Gets or sets the timeout to wait for the pending operation to complete. If the pending operation doesn't finish than the component will be disposed anyway. /// </summary> protected int DisposeTimeout { get; set; } void ThrowInvalidCommand(Commands cmd) { if (m_stateMachine.State == ExecutionState.Disposed) throw new ObjectDisposedException(ToString()); throw new InvalidOperationException(String.Format("Commnd {0} is not allowed in the state {1}", cmd, m_stateMachine.State)); } void Move(Commands cmd) { if (!m_stateMachine.Move(cmd)) ThrowInvalidCommand(cmd); } /// <summary> /// Moves the component from running to failed state. /// </summary> /// <param name="error">The exception which is describing the error.</param> /// <returns>Returns true if the component is set to the failed state, false - otherwise. /// This method works only for the running state, in any other state it will return false.</returns> protected bool Fail(Exception error) { lock (m_stateMachine) { if(m_stateMachine.State == ExecutionState.Running) { m_stateMachine.Move(Commands.Fail); m_lastError = error; return true; } } return false; } void Invoke(Commands cmd, Action action) { lock (m_stateMachine) Move(cmd); try { action(); lock(m_stateMachine) Move(Commands.Ok); } catch (Exception err) { lock (m_stateMachine) { Move(Commands.Fail); m_lastError = err; } throw; } } IPromise InvokeAsync(Commands cmd, Func<IPromise> action, Action<IPromise, IDeferred> chain) { IPromise promise = null; IPromise prev; var task = new ActionChainTask(action, null, null, true); lock (m_stateMachine) { Move(cmd); prev = m_pending; Action<Exception> errorOrCancel = e => { if (e == null) e = new OperationCanceledException(); lock (m_stateMachine) { if (m_pending == promise) { Move(Commands.Fail); m_pending = null; m_lastError = e; } } throw new PromiseTransientException(e); }; promise = task.Then( () => { lock(m_stateMachine) { if (m_pending == promise) { Move(Commands.Ok); m_pending = null; } } }, errorOrCancel, errorOrCancel ); m_pending = promise; } if (prev == null) task.Resolve(); else chain(prev, task); return promise; } #region IInitializable implementation public void Init() { Invoke(Commands.Init, OnInitialize); } protected virtual void OnInitialize() { } #endregion #region IRunnable implementation public IPromise Start() { return InvokeAsync(Commands.Start, OnStart, null); } protected virtual IPromise OnStart() { return Promise.SUCCESS; } public IPromise Stop() { return InvokeAsync(Commands.Stop, OnStop, StopPending).Then(Dispose); } protected virtual IPromise OnStop() { return Promise.SUCCESS; } /// <summary> /// Stops the current operation if one exists. /// </summary> /// <param name="current">Current.</param> /// <param name="stop">Stop.</param> protected virtual void StopPending(IPromise current, IDeferred stop) { if (current == null) { stop.Resolve(); } else { // связваем текущую операцию с операцией остановки current.On( stop.Resolve, // если текущая операция заверщилась, то можно начинать остановку stop.Reject, // если текущая операция дала ошибку - то все плохо, нельзя продолжать e => stop.Resolve() // если текущая отменилась, то можно начинать остановку ); // посылаем текущей операции сигнал остановки current.Cancel(); } } public ExecutionState State { get { return m_stateMachine.State; } } public Exception LastError { get { return m_lastError; } } #endregion #region IDisposable implementation /// <summary> /// Releases all resource used by the <see cref="Implab.Components.RunnableComponent"/> object. /// </summary> /// <remarks> /// <para>Will not try to stop the component, it will just release all resources. /// To cleanup the component gracefully use <see cref="Stop()"/> method.</para> /// <para> /// In normal cases the <see cref="Dispose()"/> method shouldn't be called, the call to the <see cref="Stop()"/> /// method is sufficient to cleanup the component. Call <see cref="Dispose()"/> only to cleanup after errors, /// especially if <see cref="Stop"/> method is failed. Using this method insted of <see cref="Stop()"/> may /// lead to the data loss by the component. /// </para></remarks> public void Dispose() { IPromise pending; lock (m_stateMachine) { if (m_stateMachine.State == ExecutionState.Disposed) return; Move(Commands.Dispose); GC.SuppressFinalize(this); pending = m_pending; m_pending = null; } if (pending != null) { pending.Cancel(); pending.Timeout(DisposeTimeout).On( () => Dispose(true, null), err => Dispose(true, err), reason => Dispose(true, new OperationCanceledException("The operation is cancelled", reason)) ); } else { Dispose(true, m_lastError); } } ~RunnableComponent() { Dispose(false, null); } #endregion protected virtual void Dispose(bool disposing, Exception lastError) { } } }