diff Implab/Parallels/WorkerPool.cs @ 12:eb418ba8275b promises

refactoring, added WorkerPool
author cin
date Tue, 05 Nov 2013 19:55:34 +0400
parents
children b0feb5b9ad1c
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/WorkerPool.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -0,0 +1,131 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Diagnostics;
+
+namespace Implab.Parallels {
+    public class WorkerPool : IDisposable {
+        readonly int m_minThreads;
+        readonly int m_maxThreads;
+        int m_runningThreads;
+        object m_lock = new object();
+
+        bool m_disposed = false;
+        ManualResetEvent m_hasTasks = new ManualResetEvent(false);
+        Queue<Action> m_queue = new Queue<Action>();
+
+        public WorkerPool(int min, int max) {
+            if (min < 0)
+                throw new ArgumentOutOfRangeException("min");
+            if (min > max)
+                min = max;
+            m_minThreads = min;
+            m_maxThreads = max;
+
+            for (int i = 0; i < m_minThreads; i++)
+                StartWorker();
+        }
+
+        public Promise<T> Invoke<T>(Func<T> task) {
+            if (m_disposed)
+                throw new ObjectDisposedException(ToString());
+            if (task == null)
+                throw new ArgumentNullException("task");
+
+            var promise = new Promise<T>();
+
+
+
+            return promise;
+        }
+
+        bool StartWorker() {
+            var current = m_runningThreads;
+            // use spins to allocate slot for the new thread
+            do {
+                if (current >= m_maxThreads)
+                    // no more slots left
+                    return false;
+            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
+
+            // slot successfully allocated
+
+            var worker = new Thread(this.Worker);
+            worker.Start();
+
+            return true;
+        }
+
+        void EnqueueTask(Action task) {
+            Debug.Assert(task != null);
+            lock (m_queue) {
+                m_queue.Enqueue(task);
+                m_hasTasks.Set();
+            }
+        }
+
+        bool FetchTask(out Action task) {
+            task = null;
+
+            while (true) {
+
+                m_hasTasks.WaitOne();
+
+                if (m_disposed)
+                    return false;
+
+                lock (m_queue) {
+                    if (m_queue.Count > 0) {
+                        task = m_queue.Dequeue();
+                        return true;
+                    }
+
+                    // no tasks left
+                    // signal that no more tasks left, lock ensures that this event won't suppress newly added task
+                    m_hasTasks.Reset();
+                }
+                
+                bool exit = true;
+
+                var current = m_runningThreads;
+                do {
+                    if (current <= m_minThreads) {
+                        exit = false; // this thread should return and wait for the new events
+                        break;
+                    }
+                } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
+
+                if (exit)
+                    return false;
+            }
+        }
+
+        void Worker() {
+            Action task;
+            while (FetchTask(out task))
+                task();
+        }
+
+        protected virtual void Dispose(bool disposing) {
+            if (disposing) {
+                lock (m_lock) {
+                    if (m_disposed)
+                        return;
+                    m_disposed = true;
+                }
+                m_hasTasks.Set();
+                GC.SuppressFinalize(this);
+            }
+        }
+
+        public void Dispose() {
+            Dispose(true);
+        }
+
+        ~WorkerPool() {
+            Dispose(false);
+        }
+    }
+}