diff Implab/Parallels/WorkerPool.cs @ 16:5a4b735ba669 promises

sync
author cin
date Thu, 07 Nov 2013 20:20:26 +0400
parents 0f982f9b7d4d
children 7cd4a843b4e4
line wrap: on
line diff
--- a/Implab/Parallels/WorkerPool.cs	Thu Nov 07 03:41:32 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Thu Nov 07 20:20:26 2013 +0400
@@ -10,20 +10,27 @@
 
         MTQueue<Action> m_queue = new MTQueue<Action>();
         int m_queueLength = 0;
+        readonly int m_threshold = 1;
 
-        public WorkerPool(int minThreads, int maxThreads)
+        public WorkerPool(int minThreads, int maxThreads, int threshold)
             : base(minThreads, maxThreads) {
-                InitPool();
+            m_threshold = threshold;
+            InitPool();
+        }
+
+        public WorkerPool(int minThreads, int maxThreads) :
+            base(minThreads, maxThreads) {
+            InitPool();
         }
 
         public WorkerPool(int threads)
             : base(threads) {
-                InitPool();
+            InitPool();
         }
 
         public WorkerPool()
             : base() {
-                InitPool();
+            InitPool();
         }
 
         public Promise<T> Invoke<T>(Func<T> task) {
@@ -47,11 +54,20 @@
 
         protected void EnqueueTask(Action unit) {
             Debug.Assert(unit != null);
-            Interlocked.Increment(ref m_queueLength);
+            var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
-            // if there are sleeping threads in the pool wake one
-            // probably this will lead a dry run
-            WakeNewWorker();
+
+            if (ThreadCount == 0)
+                // force to start
+                WakeNewWorker(false);
+        }
+
+        protected override void WakeNewWorker(bool extend) {
+            if (extend && m_queueLength <= m_threshold)
+                // in this case we are in active thread and it request for additional workers
+                // satisfy it only when queue is longer than threshold
+                return;
+            base.WakeNewWorker(extend);
         }
 
         protected override bool TryDequeue(out Action unit) {
@@ -65,5 +81,10 @@
         protected override void InvokeUnit(Action unit) {
             unit();
         }
+
+        protected override void Suspend() {
+            if (m_queueLength == 0)
+                base.Suspend();
+        }
     }
 }