view Implab/Parallels/WorkerPool.cs @ 12:eb418ba8275b promises

refactoring, added WorkerPool
author cin
date Tue, 05 Nov 2013 19:55:34 +0400
parents
children b0feb5b9ad1c
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;

namespace Implab.Parallels {
    public class WorkerPool : IDisposable {
        readonly int m_minThreads;
        readonly int m_maxThreads;
        int m_runningThreads;
        object m_lock = new object();

        bool m_disposed = false;
        ManualResetEvent m_hasTasks = new ManualResetEvent(false);
        Queue<Action> m_queue = new Queue<Action>();

        public WorkerPool(int min, int max) {
            if (min < 0)
                throw new ArgumentOutOfRangeException("min");
            if (min > max)
                min = max;
            m_minThreads = min;
            m_maxThreads = max;

            for (int i = 0; i < m_minThreads; i++)
                StartWorker();
        }

        public Promise<T> Invoke<T>(Func<T> task) {
            if (m_disposed)
                throw new ObjectDisposedException(ToString());
            if (task == null)
                throw new ArgumentNullException("task");

            var promise = new Promise<T>();



            return promise;
        }

        bool StartWorker() {
            var current = m_runningThreads;
            // use spins to allocate slot for the new thread
            do {
                if (current >= m_maxThreads)
                    // no more slots left
                    return false;
            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));

            // slot successfully allocated

            var worker = new Thread(this.Worker);
            worker.Start();

            return true;
        }

        void EnqueueTask(Action task) {
            Debug.Assert(task != null);
            lock (m_queue) {
                m_queue.Enqueue(task);
                m_hasTasks.Set();
            }
        }

        bool FetchTask(out Action task) {
            task = null;

            while (true) {

                m_hasTasks.WaitOne();

                if (m_disposed)
                    return false;

                lock (m_queue) {
                    if (m_queue.Count > 0) {
                        task = m_queue.Dequeue();
                        return true;
                    }

                    // no tasks left
                    // signal that no more tasks left, lock ensures that this event won't suppress newly added task
                    m_hasTasks.Reset();
                }
                
                bool exit = true;

                var current = m_runningThreads;
                do {
                    if (current <= m_minThreads) {
                        exit = false; // this thread should return and wait for the new events
                        break;
                    }
                } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));

                if (exit)
                    return false;
            }
        }

        void Worker() {
            Action task;
            while (FetchTask(out task))
                task();
        }

        protected virtual void Dispose(bool disposing) {
            if (disposing) {
                lock (m_lock) {
                    if (m_disposed)
                        return;
                    m_disposed = true;
                }
                m_hasTasks.Set();
                GC.SuppressFinalize(this);
            }
        }

        public void Dispose() {
            Dispose(true);
        }

        ~WorkerPool() {
            Dispose(false);
        }
    }
}