view Implab/Parallels/DispatchPool.cs @ 181:b2b6a6640aa3 ref20160224

minor fixes and debug
author cin
date Thu, 24 Mar 2016 03:54:46 +0300
parents f803565868a4
children
line wrap: on
line source

using System;
using System.Threading;

namespace Implab.Parallels {
    public abstract class DispatchPool<TUnit> : IDisposable {
        readonly int m_minThreadsLimit;
        readonly int m_maxThreadsLimit;
        readonly int m_releaseTimeout = 1000; // the timeout while the working thread will wait for the new tasks before exit

        int m_threads; // the current size of the pool
        int m_maxRunningThreads; // the meximum reached size of the pool
        int m_exit; // the pool is going to shutdown, all unused workers are released

        readonly object m_signal = new object(); // used to pulse waiting threads

        protected DispatchPool(int min, int max) {
            if (min < 0)
                throw new ArgumentOutOfRangeException("min");
            if (max <= 0)
                throw new ArgumentOutOfRangeException("max");

            if (min > max)
                min = max;
            m_minThreadsLimit = min;
            m_maxThreadsLimit = max;
        }

        protected DispatchPool(int threads)
            : this(threads, threads) {
        }

        protected DispatchPool() {

            m_minThreadsLimit = 0;
            m_maxThreadsLimit = Environment.ProcessorCount;
        }

        protected void InitPool() {
            for (int i = 0; i < m_minThreadsLimit; i++)
                StartWorker();
        }

        public int PoolSize {
            get {
                Thread.MemoryBarrier();
                return m_threads;
            }
        }
            
        public int MaxRunningThreads {
            get {
                Thread.MemoryBarrier();
                return m_maxRunningThreads;
            }
        }

        protected bool IsDisposed {
            get {
                Thread.MemoryBarrier();
                return m_exit == 1;
            }
        }

        protected abstract bool TryDequeue(out TUnit unit);

        bool Dequeue(out TUnit unit, int timeout) {
            int ts = Environment.TickCount;
            if (TryDequeue(out unit))
                return true;
            lock (m_signal) {
                while (!TryDequeue(out unit) && m_exit == 0)
                    if(!Monitor.Wait(m_signal, Math.Max(0, ts + timeout - Environment.TickCount))) {
                        // timeout
                        return false;
                    }
                // queue item or terminate
                Monitor.Pulse(m_signal);
                if (m_exit == 1)
                    return false;
            }
            return true;
        }

        protected void SignalThread() {
            lock (m_signal) {
                Monitor.Pulse(m_signal);
            }
        }

        #region thread slots traits

        bool AllocateThreadSlot() {
            int current;
            // use spins to allocate slot for the new thread
            do {
                current = m_threads;
                if (current >= m_maxThreadsLimit || m_exit == 1)
                    // no more slots left or the pool has been disposed
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_threads, current + 1, current));

            UpdateMaxThreads(current + 1);

            return true;
        }

        bool AllocateThreadSlot(int desired) {
            if (desired - 1 != Interlocked.CompareExchange(ref m_threads, desired, desired - 1))
                return false;

            UpdateMaxThreads(desired);

            return true;
        }

        bool ReleaseThreadSlot(out bool last) {
            last = false;
            int current;
            // use spins to release slot for the new thread
            Thread.MemoryBarrier();
            do {
                current = m_threads;
                if (current <= m_minThreadsLimit && m_exit == 0)
                    // the thread is reserved
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_threads, current - 1, current));

            last = (current == 1);

            return true;
        }

        void UpdateMaxThreads(int count) {
            int max;
            do {
                max = m_maxRunningThreads;
                if (max >= count)
                    break;
            } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
        }

        #endregion

        protected bool StartWorker() {
            if (AllocateThreadSlot()) {
                // slot successfully allocated
                var worker = new Thread(Worker);
                worker.IsBackground = true;
                worker.Start();

                return true;
            }
            return false;
        }

        protected abstract void InvokeUnit(TUnit unit);

        protected virtual void Worker() {
            TUnit unit;
            bool last;
            do {
                while (Dequeue(out unit, m_releaseTimeout)) {
                    InvokeUnit(unit);
                }
                if(!ReleaseThreadSlot(out last))
                    continue;
                // queue may be not empty
                if (last && TryDequeue(out unit)) {
                    InvokeUnit(unit);
                    if (AllocateThreadSlot(1))
                        continue;
                    // we can safely exit since pool is alive
                }
                break;
            } while(true);
        }


        protected virtual void Dispose(bool disposing) {
            if (disposing) {
                if (0 == Interlocked.CompareExchange(ref m_exit, 1, 0)) { // implies memory barrier
                    // wake sleeping threads
                    SignalThread();
                    GC.SuppressFinalize(this);
                }
            }
        }

        public void Dispose() {
            Dispose(true);
        }

        ~DispatchPool() {
            Dispose(false);
        }
    }
}