Mercurial > pub > ImplabNet
view Implab/Components/RunnableComponent.cs @ 262:f1696cdc3d7a v3 v3.0.8
Added IInitializable.Initialize() overload
Added IRunnable.Start(), IRunnable.Start() overloads
Fixed cancellation of the current operation when Stop() is called
More tests
author | cin |
---|---|
date | Mon, 16 Apr 2018 02:12:39 +0300 |
parents | 547a2fc0d93e |
children |
line wrap: on
line source
using System; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; namespace Implab.Components { /// <summary> /// Base class for implementing components which support start and stop operations, /// such components may represent running services. /// </summary> /// <remarks> /// This class provides a basic lifecycle from the creation to the /// termination of the component. /// </remarks> public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable { /// <summary> /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task, /// when the task completes the associated token source will be disposed. /// </summary> class AsyncOperationDescriptor { public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor(); readonly CancellationTokenSource m_cts; bool m_done; public CancellationToken Token { get { return m_cts == null ? CancellationToken.None : m_cts.Token; } } public Task Task { get; private set; } private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) { m_cts = cts; Task = Chain(task); } private AsyncOperationDescriptor() { Task = Task.CompletedTask; } public void Cancel() { if (m_cts != null) { lock (m_cts) { if (!m_done) m_cts.Cancel(); } } } void Done() { if (m_cts != null) { lock (m_cts) { m_done = true; m_cts.Dispose(); } } else { m_done = true; } } async Task Chain(Task other) { try { await other; } finally { Done(); } } public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) { var cts = ct.CanBeCanceled ? CancellationTokenSource.CreateLinkedTokenSource(ct) : new CancellationTokenSource(); return new AsyncOperationDescriptor(factory(cts.Token), cts); } } // this lock is used to synchronize state flow of the component during // processing calls from a client and internal processes. readonly object m_lock = new object(); // current operation cookie, used to check wheather a call to // MoveSuccess/MoveFailed method belongs to the current // operation, if cookies didn't match ignore completion result. object m_cookie; // AsyncOperationDscriptor aggregates a task and it's cancellation token AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None; ExecutionState m_state; /// <summary> /// Объект синхронизации используется для обеспечения совместного доступа /// клиента компоненты и процессов, протекающих внутри компоненты, к общему /// состоянию, т.е.true таким свойствам, как <see cref="State"/>, /// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/> /// вызываются уже с установленной блокировкой, поэтому дополнительная /// синхронизация не требуется. /// </summary> public object SynchronizationObject { get { return m_lock; } } protected RunnableComponent(bool initialized) { State = initialized ? ExecutionState.Ready : ExecutionState.Created; } public Task Completion { get { return m_current.Task; } } public ExecutionState State { get { return m_state; } private set { if (m_state != value) { m_state = value; StateChanged.DispatchEvent(this, new StateChangeEventArgs { State = value, LastError = LastError }); } } } public Exception LastError { get; private set; } /// <summary> /// Событие изменения состояния компоненты.see Обработчики данного события /// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны /// выполняться максимально быстро. /// </summary> public event EventHandler<StateChangeEventArgs> StateChanged; /// <summary> /// Releases all resources used by the current component regardless of its /// execution state. /// </summary> /// <remarks> /// Calling to this method may result unexpedted results if the component /// isn't in the stopped state. Call this method after the component is /// stopped if needed or if the component is in the failed state. /// </remarks> public void Dispose() { bool dispose = false; lock (SynchronizationObject) { if (m_state != ExecutionState.Disposed) { dispose = true; m_state = ExecutionState.Disposed; m_cookie = new object(); } } if (dispose) { Dispose(true); GC.SuppressFinalize(this); } } ~RunnableComponent() { Dispose(false); } /// <summary> /// Releases all resources used by the current component regardless of its /// execution state. /// </summary> /// <param name="disposing">Indicates that the component is disposed /// during a normal disposing or during GC.</param> protected virtual void Dispose(bool disposing) { } public void Initialize() { Initialize(CancellationToken.None); } public void Initialize(CancellationToken ct) { var cookie = new object(); if (MoveInitialize(cookie)) Safe.NoWait(ScheduleTask(InitializeInternalAsync, ct, cookie)); else throw new InvalidOperationException(); } /// <summary> /// This method is used for initialization during a component creation. /// </summary> /// <param name="ct">A cancellation token for this operation</param> /// <remarks> /// This method should be used for short and mostly syncronous operations, /// other operations which require time to run shoud be placed in /// <see cref="StartInternalAsync(CancellationToken)"/> method. /// </remarks> protected virtual Task InitializeInternalAsync(CancellationToken ct) { return Task.CompletedTask; } public void Start() { Start(CancellationToken.None); } public void Start(CancellationToken ct) { var cookie = new object(); if (MoveStart(cookie)) Safe.NoWait(ScheduleStartAndRun(ct, cookie)); else throw new InvalidOperationException(); } async Task ScheduleStartAndRun(CancellationToken ct, object cookie) { try { await ScheduleTask(StartInternalAsync, ct, cookie); RunInternal(); } catch (Exception err) { Fail(err); } } protected virtual Task StartInternalAsync(CancellationToken ct) { return Task.CompletedTask; } /// <summary> /// This method is called after the component is enetered running state, /// use this method to /// </summary> protected virtual void RunInternal() { } public void Stop() { Stop(CancellationToken.None); } public void Stop(CancellationToken ct) { var cookie = new object(); if (MoveStop(cookie)) Safe.NoWait(ScheduleTask(StopAsync, ct, cookie)); else throw new InvalidOperationException(); } async Task StopAsync(CancellationToken ct) { m_current.Cancel(); try { await Completion; } catch(OperationCanceledException) { // OK } ct.ThrowIfCancellationRequested(); await StopInternalAsync(ct); } protected virtual Task StopInternalAsync(CancellationToken ct) { return Task.CompletedTask; } protected void Fail(Exception err) { lock(m_lock) { if (m_state != ExecutionState.Running) return; m_cookie = new object(); LastError = err; State = ExecutionState.Failed; } } #region state management bool MoveInitialize(object cookie) { lock (m_lock) { if (State != ExecutionState.Created) return false; State = ExecutionState.Initializing; m_cookie = cookie; return true; } } bool MoveStart(object cookie) { lock (m_lock) { if (State != ExecutionState.Ready) return false; State = ExecutionState.Starting; m_cookie = cookie; return true; } } bool MoveStop(object cookie) { lock (m_lock) { if (State != ExecutionState.Starting && State != ExecutionState.Running) return false; State = ExecutionState.Stopping; m_cookie = cookie; return true; } } void MoveSuccess(object cookie) { lock (m_lock) { if (m_cookie != cookie) return; switch (State) { case ExecutionState.Initializing: State = ExecutionState.Ready; break; case ExecutionState.Starting: State = ExecutionState.Running; break; case ExecutionState.Stopping: State = ExecutionState.Stopped; break; } } } bool MoveFailed(Exception err, object cookie) { lock (m_lock) { if (m_cookie != cookie) return false; LastError = err; State = ExecutionState.Failed; return true; } } Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) { var op = AsyncOperationDescriptor.Create(async (x) => { try { await next(x); MoveSuccess(cookie); } catch (Exception e) { MoveFailed(e, cookie); throw; } }, ct); m_current = op; return op.Task; } #endregion } }