view Implab/Components/RunnableComponent.cs @ 259:7d52dc684bbd v3

PollingComponent: implemented correct stopping
author cin
date Fri, 13 Apr 2018 03:57:39 +0300
parents 440801d88019
children 547a2fc0d93e
line wrap: on
line source

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace Implab.Components {
    /// <summary>
    /// Base class for implementing components which support start and stop operations,
    /// such components may represent running services.
    /// </summary>
    /// <remarks>
    /// This class provides a basic lifecycle from the creation to the
    /// termination of the component.
    /// </remarks>
    public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {

        /// <summary>
        /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
        /// when the task completes the associated token source will be disposed.
        /// </summary>
        class AsyncOperationDescriptor {

            public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();

            readonly CancellationTokenSource m_cts;

            bool m_done;

            public CancellationToken Token {
                get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
            }

            public Task Task { get; private set; }

            private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
                m_cts = cts;
                Task = Chain(task);
            }

            private AsyncOperationDescriptor() {
                Task = Task.CompletedTask;
            }

            public void Cancel() {
                if (m_cts != null) {
                    lock (m_cts) {
                        if (!m_done)
                            m_cts.Cancel();
                    }
                }
            }

            void Done() {
                if (m_cts != null) {
                    lock (m_cts) {
                        m_done = true;
                        m_cts.Dispose();
                    }
                } else {
                    m_done = true;
                }
            }

            async Task Chain(Task other) {
                try {
                    await other;
                } finally {
                    Done();
                }
            }

            public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
                var cts = ct.CanBeCanceled ?
                    CancellationTokenSource.CreateLinkedTokenSource(ct) :
                    new CancellationTokenSource();

                return new AsyncOperationDescriptor(factory(cts.Token), cts);
            }

        }

        // this lock is used to synchronize state flow of the component during
        // processing calls from a client and internal processes.
        readonly object m_lock = new object();

        // current operation cookie, used to check wheather a call to
        // MoveSuccess/MoveFailed method belongs to the current
        // operation, if cookies didn't match ignore completion result.
        object m_cookie;

        // AsyncOperationDscriptor aggregates a task and it's cancellation token
        AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;

        ExecutionState m_state;

        /// <summary>
        /// Объект синхронизации используется для обеспечения совместного доступа
        /// клиента компоненты и процессов, протекающих внутри компоненты, к общему
        /// состоянию, т.е.true таким свойствам, как <see cref="State"/>,
        /// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/>
        /// вызываются уже с установленной блокировкой, поэтому дополнительная
        /// синхронизация не требуется.
        /// </summary>
        public object SynchronizationObject { get { return m_lock; } }

        protected RunnableComponent(bool initialized) {
            State = initialized ? ExecutionState.Ready : ExecutionState.Created;
        }

        public Task Completion {
            get { return m_current.Task; }
        }

        public ExecutionState State {
            get { return m_state; }
            private set {
                if (m_state != value) {
                    m_state = value;
                    StateChanged.DispatchEvent(this, new StateChangeEventArgs {
                        State = value,
                        LastError = LastError
                    });
                }
            }
        }

        public Exception LastError { get; private set; }

        /// <summary>
        /// Событие изменения состояния компоненты.see Обработчики данного события
        /// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны
        /// выполняться максимально быстро.
        /// </summary>
        public event EventHandler<StateChangeEventArgs> StateChanged;

        /// <summary>
        /// Releases all resources used by the current component regardless of its
        /// execution state.
        /// </summary>
        /// <remarks>
        /// Calling to this method may result unexpedted results if the component
        /// isn't in the stopped state. Call this method after the component is
        /// stopped if needed or if the component is in the failed state.
        /// </remarks>
        public void Dispose() {
            bool dispose = false;
            lock (SynchronizationObject) {
                if (m_state != ExecutionState.Disposed) {
                    dispose = true;
                    m_state = ExecutionState.Disposed;
                    m_cookie = new object();
                }
            }
            if (dispose) {
                Dispose(true);
                GC.SuppressFinalize(this);
            }
        }

        ~RunnableComponent() {
            Dispose(false);
        }

        /// <summary>
        /// Releases all resources used by the current component regardless of its
        /// execution state.
        /// </summary>
        /// <param name="disposing">Indicates that the component is disposed
        /// during a normal disposing or during GC.</param>
        protected virtual void Dispose(bool disposing) {
        }

        public void Initialize() {
            var cookie = new object();
            if (MoveInitialize(cookie))
                Safe.NoWait(ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie));
            else
                throw new InvalidOperationException();
        }

        /// <summary>
        /// This method is used for initialization during a component creation.
        /// </summary>
        /// <param name="ct">A cancellation token for this operation</param>
        /// <remarks>
        /// This method should be used for short and mostly syncronous operations,
        /// other operations which require time to run shoud be placed in
        /// <see cref="StartInternalAsync(CancellationToken)"/> method.
        /// </remarks>
        protected virtual Task InitializeInternalAsync(CancellationToken ct) {
            return Task.CompletedTask;
        }

        public void Start(CancellationToken ct) {
            var cookie = new object();
            if (MoveStart(cookie))
                Safe.NoWait(ScheduleStartAndRun(ct, cookie));
            else
                throw new InvalidOperationException();
        }

        async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
            try {
                await ScheduleTask(StartInternalAsync, ct, cookie);
                RunInternal();
            } catch (Exception err) {
                Fail(err);
            }
        }

        protected virtual Task StartInternalAsync(CancellationToken ct) {
            return Task.CompletedTask;
        }

        /// <summary>
        /// This method is called after the component is enetered running state,
        /// use this method to 
        /// </summary>
        protected virtual void RunInternal() {

        }

        public void Stop(CancellationToken ct) {
            var cookie = new object();
            if (MoveStop(cookie))
                Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
            else
                throw new InvalidOperationException();
        }

        async Task StopAsync(CancellationToken ct) {
            m_current.Cancel();
            await Completion;

            ct.ThrowIfCancellationRequested();

            await StopInternalAsync(ct);
        }

        protected virtual Task StopInternalAsync(CancellationToken ct) {
            return Task.CompletedTask;
        }

        protected void Fail(Exception err) {
            lock(m_lock) {
                if (m_state != ExecutionState.Running)
                    return;
                m_cookie = new object();
                LastError = err;
                State = ExecutionState.Failed;
            }
        }


        #region state management

        bool MoveInitialize(object cookie) {
            lock (m_lock) {
                if (State != ExecutionState.Created)
                    return false;
                State = ExecutionState.Initializing;
                m_cookie = cookie;
                return true;
            }
        }

        bool MoveStart(object cookie) {
            lock (m_lock) {
                if (State != ExecutionState.Ready)
                    return false;
                State = ExecutionState.Starting;
                m_cookie = cookie;
                return true;
            }
        }

        bool MoveStop(object cookie) {
            lock (m_lock) {
                if (State != ExecutionState.Starting && State != ExecutionState.Running)
                    return false;
                State = ExecutionState.Stopping;
                m_cookie = cookie;
                return true;
            }
        }

        void MoveSuccess(object cookie) {
            lock (m_lock) {
                if (m_cookie != cookie)
                    return;
                switch (State) {
                    case ExecutionState.Initializing:
                        State = ExecutionState.Ready;
                        break;
                    case ExecutionState.Starting:
                        State = ExecutionState.Running;
                        break;
                    case ExecutionState.Stopping:
                        State = ExecutionState.Stopped;
                        break;
                }
            }
        }

        void MoveFailed(Exception err, object cookie) {
            lock (m_lock) {
                if (m_cookie != cookie)
                    return;
                LastError = err;
                State = ExecutionState.Failed;
            }
        }

        Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {

            var op = AsyncOperationDescriptor.Create(async (x) => {
                try {
                    await next(x);
                    MoveSuccess(cookie);
                } catch (Exception e) {
                    MoveFailed(e, cookie);
                }
            }, ct);

            m_current = op;
            return op.Task;
        }

        #endregion
    }
}