view Implab/Parallels/WorkerPool.cs @ 14:e943453e5039 promises

Implemented interllocked queue fixed promise syncronization
author cin
date Wed, 06 Nov 2013 17:49:12 +0400
parents b0feb5b9ad1c
children 0f982f9b7d4d
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace Implab.Parallels {
    public class WorkerPool : IDisposable {
        readonly int m_minThreads;
        readonly int m_maxThreads;
        int m_runningThreads;
        object m_lock = new object();

        bool m_disposed = false;

        // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
        ManualResetEvent m_hasTasks = new ManualResetEvent(false);
        Queue<Action> m_queue = new Queue<Action>();

        public WorkerPool(int min, int max) {
            if (min < 0)
                throw new ArgumentOutOfRangeException("min");
            if (max <= 0)
                throw new ArgumentOutOfRangeException("max");

            if (min > max)
                min = max;
            m_minThreads = min;
            m_maxThreads = max;

            InitPool();
        }

        public WorkerPool(int max)
            : this(0, max) {
        }

        public WorkerPool() {
            int maxThreads, maxCP;
            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);

            m_minThreads = 0;
            m_maxThreads = maxThreads;

            InitPool();
        }

        void InitPool() {
            for (int i = 0; i < m_minThreads; i++)
                StartWorker();
        }

        public int ThreadCount {
            get {
                return m_runningThreads;
            }
        }

        public Promise<T> Invoke<T>(Func<T> task) {
            if (m_disposed)
                throw new ObjectDisposedException(ToString());
            if (task == null)
                throw new ArgumentNullException("task");

            var promise = new Promise<T>();

            var queueLen = EnqueueTask(delegate() {
                try {
                    promise.Resolve(task());
                } catch (Exception e) {
                    promise.Reject(e);
                }
            });

            if (queueLen > 1)
                StartWorker();

            return promise;
        }

        bool StartWorker() {
            var current = m_runningThreads;
            // use spins to allocate slot for the new thread
            do {
                if (current >= m_maxThreads)
                    // no more slots left
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));

            // slot successfully allocated

            var worker = new Thread(this.Worker);
            worker.IsBackground = true;
            worker.Start();

            return true;
        }

        int EnqueueTask(Action task) {
            Debug.Assert(task != null);
            lock (m_queue) {
                m_queue.Enqueue(task);
                m_hasTasks.Set();
                return m_queue.Count;
            }
        }

        bool FetchTask(out Action task) {
            task = null;

            while (true) {

                m_hasTasks.WaitOne();

                if (m_disposed)
                    return false;

                lock (m_queue) {
                    if (m_queue.Count > 0) {
                        task = m_queue.Dequeue();
                        return true;
                    }

                    // no tasks left
                    // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
                    m_hasTasks.Reset();
                }
                
                bool exit = true;

                var current = m_runningThreads;
                do {
                    if (current <= m_minThreads) {
                        exit = false; // this thread should return and wait for the new events
                        break;
                    }
                } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));

                if (exit)
                    return false;
            }
        }

        void Worker() {
            Action task;
            while (FetchTask(out task))
                task();
        }

        protected virtual void Dispose(bool disposing) {
            if (disposing) {
                lock (m_lock) {
                    if (m_disposed)
                        return;
                    m_disposed = true;
                }
                m_hasTasks.Set();
                GC.SuppressFinalize(this);
            }
        }

        public void Dispose() {
            Dispose(true);
        }

        ~WorkerPool() {
            Dispose(false);
        }
    }
}