diff Implab/Parallels/WorkerPool.cs @ 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400
parents b0feb5b9ad1c
children 5a4b735ba669
line wrap: on
line diff
--- a/Implab/Parallels/WorkerPool.cs	Wed Nov 06 17:49:12 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Thu Nov 07 03:41:32 2013 +0400
@@ -6,66 +6,35 @@
 using System.Diagnostics;
 
 namespace Implab.Parallels {
-    public class WorkerPool : IDisposable {
-        readonly int m_minThreads;
-        readonly int m_maxThreads;
-        int m_runningThreads;
-        object m_lock = new object();
-
-        bool m_disposed = false;
-
-        // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
-        ManualResetEvent m_hasTasks = new ManualResetEvent(false);
-        Queue<Action> m_queue = new Queue<Action>();
+    public class WorkerPool : DispatchPool<Action> {
 
-        public WorkerPool(int min, int max) {
-            if (min < 0)
-                throw new ArgumentOutOfRangeException("min");
-            if (max <= 0)
-                throw new ArgumentOutOfRangeException("max");
+        MTQueue<Action> m_queue = new MTQueue<Action>();
+        int m_queueLength = 0;
 
-            if (min > max)
-                min = max;
-            m_minThreads = min;
-            m_maxThreads = max;
-
-            InitPool();
+        public WorkerPool(int minThreads, int maxThreads)
+            : base(minThreads, maxThreads) {
+                InitPool();
         }
 
-        public WorkerPool(int max)
-            : this(0, max) {
+        public WorkerPool(int threads)
+            : base(threads) {
+                InitPool();
         }
 
-        public WorkerPool() {
-            int maxThreads, maxCP;
-            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
-
-            m_minThreads = 0;
-            m_maxThreads = maxThreads;
-
-            InitPool();
-        }
-
-        void InitPool() {
-            for (int i = 0; i < m_minThreads; i++)
-                StartWorker();
-        }
-
-        public int ThreadCount {
-            get {
-                return m_runningThreads;
-            }
+        public WorkerPool()
+            : base() {
+                InitPool();
         }
 
         public Promise<T> Invoke<T>(Func<T> task) {
-            if (m_disposed)
-                throw new ObjectDisposedException(ToString());
             if (task == null)
                 throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
 
             var promise = new Promise<T>();
 
-            var queueLen = EnqueueTask(delegate() {
+            EnqueueTask(delegate() {
                 try {
                     promise.Resolve(task());
                 } catch (Exception e) {
@@ -73,99 +42,28 @@
                 }
             });
 
-            if (queueLen > 1)
-                StartWorker();
-
             return promise;
         }
 
-        bool StartWorker() {
-            var current = m_runningThreads;
-            // use spins to allocate slot for the new thread
-            do {
-                if (current >= m_maxThreads)
-                    // no more slots left
-                    return false;
-            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
-
-            // slot successfully allocated
-
-            var worker = new Thread(this.Worker);
-            worker.IsBackground = true;
-            worker.Start();
-
-            return true;
-        }
-
-        int EnqueueTask(Action task) {
-            Debug.Assert(task != null);
-            lock (m_queue) {
-                m_queue.Enqueue(task);
-                m_hasTasks.Set();
-                return m_queue.Count;
-            }
+        protected void EnqueueTask(Action unit) {
+            Debug.Assert(unit != null);
+            Interlocked.Increment(ref m_queueLength);
+            m_queue.Enqueue(unit);
+            // if there are sleeping threads in the pool wake one
+            // probably this will lead a dry run
+            WakeNewWorker();
         }
 
-        bool FetchTask(out Action task) {
-            task = null;
-
-            while (true) {
-
-                m_hasTasks.WaitOne();
-
-                if (m_disposed)
-                    return false;
-
-                lock (m_queue) {
-                    if (m_queue.Count > 0) {
-                        task = m_queue.Dequeue();
-                        return true;
-                    }
-
-                    // no tasks left
-                    // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
-                    m_hasTasks.Reset();
-                }
-                
-                bool exit = true;
-
-                var current = m_runningThreads;
-                do {
-                    if (current <= m_minThreads) {
-                        exit = false; // this thread should return and wait for the new events
-                        break;
-                    }
-                } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
-
-                if (exit)
-                    return false;
+        protected override bool TryDequeue(out Action unit) {
+            if (m_queue.TryDequeue(out unit)) {
+                Interlocked.Decrement(ref m_queueLength);
+                return true;
             }
+            return false;
         }
 
-        void Worker() {
-            Action task;
-            while (FetchTask(out task))
-                task();
-        }
-
-        protected virtual void Dispose(bool disposing) {
-            if (disposing) {
-                lock (m_lock) {
-                    if (m_disposed)
-                        return;
-                    m_disposed = true;
-                }
-                m_hasTasks.Set();
-                GC.SuppressFinalize(this);
-            }
-        }
-
-        public void Dispose() {
-            Dispose(true);
-        }
-
-        ~WorkerPool() {
-            Dispose(false);
+        protected override void InvokeUnit(Action unit) {
+            unit();
         }
     }
 }