Mercurial > pub > ImplabNet
diff Implab/Components/RunnableComponent.cs @ 251:7c7e9ad6fe4a v3
Prerelease version of RunnableComponent
Added draft messaging interfaces
Added more more helpers to Xml/SerializationHelpers
author | cin |
---|---|
date | Sun, 11 Feb 2018 00:49:51 +0300 |
parents | 9f63dade3a40 |
children | 6f4630d0bcd9 |
line wrap: on
line diff
--- a/Implab/Components/RunnableComponent.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Components/RunnableComponent.cs Sun Feb 11 00:49:51 2018 +0300 @@ -1,57 +1,273 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -namespace Implab.Components -{ - public class RunnableComponent : IRunnable { +namespace Implab.Components { + /// <summary> + /// Base class for implementing components which support start and stop operations, + /// such components may represent running services. + /// </summary> + /// <remarks> + /// This class provides a basic lifecycle from the creation to the + /// termination of the component. + /// </remarks> + public class RunnableComponent : IRunnable, IInitializable, IDisposable { + + /// <summary> + /// This class bound <see cref="CancellationTokenSource"/> lifetime to the task, + /// when the task completes the associated token source will be disposed. + /// </summary> + class AsyncOperationDescriptor { + + public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor(); + + readonly CancellationTokenSource m_cts; + + bool m_done; + + public CancellationToken Token { + get { return m_cts == null ? CancellationToken.None : m_cts.Token; } + } + + public Task Task { get; private set; } + + private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) { + m_cts = cts; + Task = Chain(task); + } + + private AsyncOperationDescriptor() { + Task = Task.CompletedTask; + } + public void Cancel() { + if (m_cts != null) { + lock (m_cts) { + if (!m_done) + m_cts.Cancel(); + } + } + } + + void Done() { + if (m_cts != null) { + lock (m_cts) { + m_done = true; + m_cts.Dispose(); + } + } else { + m_done = true; + } + } + + async Task Chain(Task other) { + try { + await other; + } finally { + Done(); + } + } + + public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) { + var cts = ct.CanBeCanceled ? + CancellationTokenSource.CreateLinkedTokenSource(ct) : + new CancellationTokenSource(); + + return new AsyncOperationDescriptor(factory(cts.Token), cts); + } + + } + + // this lock is used to synchronize state flow of the component during + // completions or the operations. readonly object m_lock = new object(); - CancellationTokenSource m_cts; + // current operation cookie, used to check wheather a call to + // MoveSuccess/MoveFailed method belongs to the current + // operation, if cookies didn't match ignore completion result. + object m_cookie; - public Task<ExecutionState> Completion { - get; - private set; + AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None; + + ExecutionState m_state; + + + protected RunnableComponent(bool initialized) { + State = initialized ? ExecutionState.Ready : ExecutionState.Created; } - public ExecutionState State => throw new NotImplementedException(); + public Task Completion { + get { return m_current.Task; } + } - public Exception LastError => throw new NotImplementedException(); + public ExecutionState State { + get { return m_state; } + private set { + if (m_state != value) { + m_state = value; + StateChanged.DispatchEvent(this, new StateChangeEventArgs { + State = value, + LastError = LastError + }); + } + } + } + + public Exception LastError { get; private set; } public event EventHandler<StateChangeEventArgs> StateChanged; + /// <summary> + /// Releases all resources used by the current component regardless of its + /// execution state. + /// </summary> + /// <remarks> + /// Calling to this method may result unexpedted results if the component + /// isn't in the stopped state. Call this method after the component is + /// stopped if needed or if the component is in the failed state. + /// </remarks> public void Dispose() { - lock(m_lock) { + bool dispose = false; + if (dispose) { Dispose(true); GC.SuppressFinalize(this); } } + ~RunnableComponent() { + Dispose(false); + } + + /// <summary> + /// Releases all resources used by the current component regardless of its + /// execution state. + /// </summary> + /// <param name="disposing">Indicates that the component is disposed + /// during a normal disposing or during GC.</param> protected virtual void Dispose(bool disposing) { - if (disposing) { - Safe.Dispose(m_cts); - } + } + + public void Initialize() { + var cookie = new object(); + if (MoveInitialize(cookie)) + ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie); + } + + /// <summary> + /// This method is used for initialization during a component creation. + /// </summary> + /// <param name="ct">A cancellation token for this operation</param> + /// <remarks> + /// This method should be used for short and mostly syncronous operations, + /// other operations which require time to run shoud be placed in + /// <see cref="StartInternal(CancellationToken)"/> method. + /// </remarks> + protected virtual Task InitializeInternalAsync(CancellationToken ct) { + return Task.CompletedTask; } public void Start(CancellationToken ct) { - lock(m_lock) { - switch (State) - { - - default: - throw new InvalidOperationException(); + var cookie = new object(); + if (MoveStart(cookie)) + ScheduleTask(StartInternal, ct, cookie); + } + + protected virtual Task StartInternal(CancellationToken ct) { + return Task.CompletedTask; + } + + public void Stop(CancellationToken ct) { + var cookie = new object(); + if (MoveStop(cookie)) + ScheduleTask(StopAsync, ct, cookie); + } + + async Task StopAsync(CancellationToken ct) { + m_current.Cancel(); + await Completion; + + ct.ThrowIfCancellationRequested(); + + await StopInternalAsync(ct); + } + + protected virtual Task StopInternalAsync(CancellationToken ct) { + return Task.CompletedTask; + } + + + #region state management + + bool MoveInitialize(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Created) + return false; + State = ExecutionState.Initializing; + m_cookie = cookie; + return true; + } + } + + bool MoveStart(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Ready) + return false; + State = ExecutionState.Starting; + m_cookie = cookie; + return true; + } + } + + bool MoveStop(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Starting && State != ExecutionState.Running) + return false; + State = ExecutionState.Stopping; + m_cookie = cookie; + return true; + } + } + + void MoveSuccess(object cookie) { + lock (m_lock) { + if (m_cookie != cookie) + return; + switch (State) { + case ExecutionState.Initializing: + State = ExecutionState.Ready; + break; + case ExecutionState.Starting: + State = ExecutionState.Running; + break; + case ExecutionState.Stopping: + State = ExecutionState.Stopped; + break; } } } - public void Stop(CancellationToken ct) { - throw new NotImplementedException(); + void MoveFailed(Exception err, object cookie) { + lock (m_lock) { + if (m_cookie != cookie) + return; + LastError = err; + State = ExecutionState.Failed; + } } - protected virtual Task StartImpl(CancellationToken ct) { + - return Task.CompletedTask; + protected async void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) { + try { + m_current = AsyncOperationDescriptor.Create(next, ct); + await m_current.Task; + MoveSuccess(cookie); + } catch (Exception e) { + MoveFailed(e, cookie); + } } + + #endregion } } \ No newline at end of file