view Implab/Parallels/DispatchPool.cs @ 196:40d7fed4a09e

fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author cin
date Mon, 29 Aug 2016 23:15:51 +0300
parents f803565868a4
children
line wrap: on
line source

using System;
using System.Threading;

namespace Implab.Parallels {
    public abstract class DispatchPool<TUnit> : IDisposable {
        readonly int m_minThreadsLimit;
        readonly int m_maxThreadsLimit;
        readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit

        int m_threads; // the current size of the pool
        int m_maxRunningThreads; // the meximum reached size of the pool
        int m_exit; // the pool is going to shutdown, all unused workers are released

        readonly object m_signal = new object(); // used to pulse waiting threads

        protected DispatchPool(int min, int max) {
            if (min < 0)
                throw new ArgumentOutOfRangeException("min");
            if (max <= 0)
                throw new ArgumentOutOfRangeException("max");

            if (min > max)
                min = max;
            m_minThreadsLimit = min;
            m_maxThreadsLimit = max;
        }

        protected DispatchPool(int threads)
            : this(threads, threads) {
        }

        protected DispatchPool() {

            m_minThreadsLimit = 0;
            m_maxThreadsLimit = Environment.ProcessorCount;
        }

        protected void InitPool() {
            for (int i = 0; i < m_minThreadsLimit; i++)
                StartWorker();
        }

        public int PoolSize {
            get {
                Thread.MemoryBarrier();
                return m_threads;
            }
        }
            
        public int MaxRunningThreads {
            get {
                Thread.MemoryBarrier();
                return m_maxRunningThreads;
            }
        }

        protected bool IsDisposed {
            get {
                Thread.MemoryBarrier();
                return m_exit == 1;
            }
        }

        protected abstract bool TryDequeue(out TUnit unit);

        bool Dequeue(out TUnit unit, int timeout) {
            int ts = Environment.TickCount;
            if (TryDequeue(out unit))
                return true;
            lock (m_signal) {
                while (!TryDequeue(out unit) && m_exit == 0)
                    if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
                        // timeout
                        return false;
                    }
                // queue item or terminate
                Monitor.Pulse(m_signal);
                if (m_exit == 1)
                    return false;
            }
            return true;
        }

        protected void SignalThread() {
            lock (m_signal) {
                Monitor.Pulse(m_signal);
            }
        }

        #region thread slots traits

        bool AllocateThreadSlot() {
            int current;
            // use spins to allocate slot for the new thread
            do {
                current = m_threads;
                if (current >= m_maxThreadsLimit || m_exit == 1)
                    // no more slots left or the pool has been disposed
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));

            UpdateMaxThreads(current + 1);

            return true;
        }

        bool AllocateThreadSlot(int desired) {
            if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
                return false;

            UpdateMaxThreads(desired);

            return true;
        }

        bool ReleaseThreadSlot(out bool last) {
            last = false;
            int current;
            // use spins to release slot for the new thread
            Thread.MemoryBarrier();
            do {
                current = m_threads;
                if (current <= m_minThreadsLimit && m_exit == 0)
                    // the thread is reserved
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));

            last = (current == 1);

            return true;
        }

        void UpdateMaxThreads(int count) {
            int max;
            do {
                max = m_maxRunningThreads;
                if (max >= count)
                    break;
            } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
        }

        #endregion

        protected bool StartWorker() {
            if (AllocateThreadSlot()) {
                // slot successfully allocated
                var worker = new Thread(Worker);
                worker.IsBackground = true;
                worker.Start();

                return true;
            }
            return false;
        }

        protected abstract void InvokeUnit(TUnit unit);

        protected virtual void Worker() {
            TUnit unit;
            bool last;
            do {
                while (Dequeue(out unit, m_releaseTimeout)) {
                    InvokeUnit(unit);
                }
                if(!ReleaseThreadSlot(out last))
                    continue;
                // queue may be not empty
                if (last && TryDequeue(out unit)) {
                    InvokeUnit(unit);
                    if (AllocateThreadSlot(1))
                        continue;
                    // we can safely exit since pool is alive
                }
                break;
            } while(true);
        }


        protected virtual void Dispose(bool disposing) {
            if (disposing) {
                if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
                    // wake sleeping threads
                    SignalThread();
                    GC.SuppressFinalize(this);
                }
            }
        }

        public void Dispose() {
            Dispose(true);
        }

        ~DispatchPool() {
            Dispose(false);
        }
    }
}