view Implab/Parallels/DispatchPool.cs @ 33:b255e4aeef17

removed the reference to the parent from the promise object this allows resolved promises to release parents and results they are holding. Added complete set of operations to IPromiseBase interface Subscribing to the cancellation event of the promise should not affect it's IsExclusive property More tests.
author cin
date Thu, 10 Apr 2014 02:39:29 +0400
parents 2fad2d1f4b03
children dabf79fde388
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace Implab.Parallels {
    public abstract class DispatchPool<TUnit> : IDisposable {
        readonly int m_minThreads;
        readonly int m_maxThreads;

        int m_createdThreads = 0; // the current size of the pool
        int m_activeThreads = 0; // the count of threads which are active
        int m_sleepingThreads = 0; // the count of currently inactive threads
        int m_maxRunningThreads = 0; // the meximum reached size of the pool
        int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
        int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
        int m_wakeEvents = 0; // the count of wake events
        
        AutoResetEvent m_hasTasks = new AutoResetEvent(false);

        protected DispatchPool(int min, int max) {
            if (min < 0)
                throw new ArgumentOutOfRangeException("min");
            if (max <= 0)
                throw new ArgumentOutOfRangeException("max");

            if (min > max)
                min = max;
            m_minThreads = min;
            m_maxThreads = max;
        }

        protected DispatchPool(int threads)
            : this(threads, threads) {
        }

        protected DispatchPool() {
            int maxThreads, maxCP;
            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);

            m_minThreads = 0;
            m_maxThreads = maxThreads;
        }

        protected void InitPool() {
            for (int i = 0; i < m_minThreads; i++)
                StartWorker();
        }

        public int PoolSize {
            get {
                return m_createdThreads;
            }
        }

        public int ActiveThreads {
            get {
                return m_activeThreads;
            }
        }

        public int MaxRunningThreads {
            get {
                return m_maxRunningThreads;
            }
        }

        protected bool IsDisposed {
            get {
                return m_exitRequired != 0;
            }
        }

        protected abstract bool TryDequeue(out TUnit unit);

        #region thread execution traits
        int SignalThread() {
            var signals = Interlocked.Increment(ref m_wakeEvents);
            if(signals == 1)
                m_hasTasks.Set();
            return signals;
        }

        bool FetchSignalOrWait(int timeout) {
            var start = Environment.TickCount;

            // означает, что поток владеет блокировкой и при успешном получении сигнала должен
            // ее вернуть, чтобы другой ожидающий поток смог 
            bool hasLock = false;
            do {
                int signals;
                do {
                    signals = m_wakeEvents;
                    if (signals == 0)
                        break;
                } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);

                if (signals >= 1) {
                    if (signals > 1 && hasLock)
                        m_hasTasks.Set();
                    return true;
                }
                
                if (timeout != -1)
                    timeout = Math.Max(0, timeout - (Environment.TickCount - start));

                // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
                // и уйдет на пустой цикл, после чего заблокируется

                hasLock = true; 
            } while (m_hasTasks.WaitOne(timeout));

            return false;
        }

        bool Sleep(int timeout) {
            Interlocked.Increment(ref m_sleepingThreads);
            if (FetchSignalOrWait(timeout)) {
                Interlocked.Decrement(ref m_sleepingThreads);
                return true;
            } else {
                Interlocked.Decrement(ref m_sleepingThreads);
                return false;
            }
        }
        #endregion

        /// <summary>
        /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
        /// </summary>
        protected void GrowPool() {
            if (m_exitRequired != 0)
                return;
            if (m_sleepingThreads > m_wakeEvents) {
                //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);

                // all sleeping threads may gone
                SignalThread(); // wake a sleeping thread;

                // we can't check whether signal has been processed
                // anyway it may take some time for the thread to start
                // we will ensure that at least one thread is running

                EnsurePoolIsAlive();
            } else {
                // if there is no sleeping threads in the pool
                if (!StartWorker()) {
                    // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
                    // send it a signal to spin again
                    SignalThread();
                    EnsurePoolIsAlive();
                }
            }
        }

        private void EnsurePoolIsAlive() {
            if (AllocateThreadSlot(1)) {
                // if there were no threads in the pool
                var worker = new Thread(this.Worker);
                worker.IsBackground = true;
                worker.Start();
            }
        }

        private bool Suspend() {
            //no tasks left, exit if the thread is no longer needed
            bool last;
            bool requestExit;

            // if threads have a timeout before releasing
            if (m_releaseTimeout > 0)
                requestExit = !Sleep(m_releaseTimeout);
            else
                requestExit = true;

            if (!requestExit)
                return true;

            // release unsused thread
            if (requestExit && ReleaseThreadSlot(out last)) {
                // in case at the moment the last thread was being released
                // a new task was added to the queue, we need to try
                // to revoke the thread to avoid the situation when the task is left unprocessed
                if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
                    SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
                    return AllocateThreadSlot(1); // ensure that at least one thread is alive
                }

                return false;
            }

            // wait till infinity
            Sleep(-1);

            return true;
        }

        #region thread slots traits

        bool AllocateThreadSlot() {
            int current;
            // use spins to allocate slot for the new thread
            do {
                current = m_createdThreads;
                if (current >= m_maxThreads || m_exitRequired != 0)
                    // no more slots left or the pool has been disposed
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));

            UpdateMaxThreads(current + 1);

            return true;
        }

        bool AllocateThreadSlot(int desired) {
            if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
                return false;

            UpdateMaxThreads(desired);

            return true;
        }

        bool ReleaseThreadSlot(out bool last) {
            last = false;
            int current;
            // use spins to release slot for the new thread
            do {
                current = m_createdThreads;
                if (current <= m_minThreads && m_exitRequired == 0)
                    // the thread is reserved
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));

            last = (current == 1);

            return true;
        }

        /// <summary>
        /// releases thread slot unconditionally, used during cleanup
        /// </summary>
        /// <returns>true - no more threads left</returns>
        bool ReleaseThreadSlotAnyway() {
            var left = Interlocked.Decrement(ref m_createdThreads);
            return left == 0;
        }

        void UpdateMaxThreads(int count) {
            int max;
            do {
                max = m_maxRunningThreads;
                if (max >= count)
                    break;
            } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
        }

        #endregion

        bool StartWorker() {
            if (AllocateThreadSlot()) {
                // slot successfully allocated
                var worker = new Thread(this.Worker);
                worker.IsBackground = true;
                worker.Start();

                return true;
            } else {
                return false;
            }
        }

        protected abstract void InvokeUnit(TUnit unit);

        void Worker() {
            TUnit unit;
            //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
            Interlocked.Increment(ref m_activeThreads);
            do {
                // exit if requested
                if (m_exitRequired != 0) {
                    // release the thread slot
                    Interlocked.Decrement(ref m_activeThreads);
                    if (ReleaseThreadSlotAnyway()) // it was the last worker
                        m_hasTasks.Dispose();
                    else
                        SignalThread(); // wake next worker
                    break;
                }

                // fetch task
                if (TryDequeue(out unit)) {
                    InvokeUnit(unit);
                    continue;
                }

                Interlocked.Decrement(ref m_activeThreads);

                // entering suspend state
                // keep this thread and wait                
                if (!Suspend())
                    break;
                //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
                Interlocked.Increment(ref m_activeThreads);
            } while (true);
            //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
        }

        protected virtual void Dispose(bool disposing) {
            if (disposing) {
                if (m_exitRequired == 0) {
                    if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
                        return;

                    // wake sleeping threads
                    if (m_createdThreads > 0)
                        SignalThread();
                    else
                        m_hasTasks.Dispose();
                    GC.SuppressFinalize(this);
                }
            }
        }

        public void Dispose() {
            Dispose(true);
        }

        ~DispatchPool() {
            Dispose(false);
        }
    }
}