view Implab/Components/PollingComponent.cs @ 262:f1696cdc3d7a v3 v3.0.8

Added IInitializable.Initialize() overload Added IRunnable.Start(), IRunnable.Start() overloads Fixed cancellation of the current operation when Stop() is called More tests
author cin
date Mon, 16 Apr 2018 02:12:39 +0300
parents 547a2fc0d93e
children
line wrap: on
line source

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Implab.Components {
    public abstract class PollingComponent : RunnableComponent {

        readonly Timer m_timer;

        readonly CancellationTokenSource m_cancellation = new CancellationTokenSource();

        Task m_pending;
        Task m_poll;

        /// <summary>
        /// Poll interval in milliseconds.
        /// </summary>
        /// <returns></returns>
        public int Interval { get; set; }

        /// <summary>
        /// Delay to the first poll after start in milliseconds
        /// </summary>
        /// <returns></returns>
        public int Delay { get; set; }

        /// <summary>
        /// Indicates how to handle unhandled exceptions in <see cref="Poll()"/> method.
        /// </summary>
        /// <returns></returns>
        public bool FailOnError { get; set; }

        /// <summary>
        /// Event for the unhandled exceptions in <see cref="Poll()"/> method.
        /// </summary>
        public event EventHandler<UnhandledExceptionEventArgs> UnhandledException;

        protected PollingComponent(bool initialized) : base(initialized) {
            m_timer = new Timer(OnTimer);
        }

        protected override void RunInternal() {
            ScheduleNextPoll(Delay);
        }


        protected override async Task StopInternalAsync(CancellationToken ct) {
            // component in Stopping state, no new polls will be scheduled
            m_cancellation.Cancel();
            try {
                // await for pending poll
                if (m_poll != null)
                    await m_poll;
            } catch (OperationCanceledException) {
                // OK
            }
        }
        
        protected abstract Task Poll(CancellationToken ct);

        void ScheduleNextPoll(int timeout) {
            lock (SynchronizationObject) {
                if (State == ExecutionState.Running) {
                    m_pending = Safe.CreateTask(m_cancellation.Token);
                    m_poll = m_pending.Then(() => Poll(m_cancellation.Token));
                    m_timer.Change(timeout, Timeout.Infinite);
                }
            }
        }

        async void OnTimer(object state) {
            try {
                m_pending.Start();
                await m_poll;
                ScheduleNextPoll(Interval);
            } catch (Exception e) {
                UnhandledException.DispatchEvent(this, new UnhandledExceptionEventArgs(e, false));

                if (FailOnError)
                    Fail(e);
                else
                    ScheduleNextPoll(Interval);
            }
            
        }

        protected override void Dispose(bool disposing) {
            if (disposing)
                Safe.Dispose(m_timer, m_cancellation);
            base.Dispose(disposing);
        }

    }
}