changeset 13:b0feb5b9ad1c promises

small fixes, WorkerPool still incomplete
author cin
date Wed, 06 Nov 2013 01:07:55 +0400
parents eb418ba8275b
children e943453e5039
files Implab.Test/AsyncTests.cs Implab/IProgressNotifier.cs Implab/ManagedPromise.cs Implab/Parallels/WorkerPool.cs
diffstat 4 files changed, 75 insertions(+), 5 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Tue Nov 05 19:55:34 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Wed Nov 06 01:07:55 2013 +0400
@@ -107,6 +107,36 @@
 		}
 
         [TestMethod]
+        public void WorkerPoolSizeTest() {
+            var pool = new WorkerPool(5,10);
+
+            Assert.AreEqual(5, pool.ThreadCount);
+
+            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+
+            Assert.AreEqual(5, pool.ThreadCount);
+
+            for (int i = 0; i < 100; i++)
+                pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+            Assert.AreEqual(10, pool.ThreadCount);
+        }
+
+        [TestMethod]
+        public void WorkerPoolCorrectTest() {
+            var pool = new WorkerPool(5, 20);
+
+            var count = 0;
+            for (int i = 0; i < 1000; i++)
+                pool
+                    .Invoke(() => 1)
+                    .Then(x => Interlocked.Add(ref count, x));
+
+            Assert.AreEqual(1000, count);
+        }
+
+        [TestMethod]
         public void ComplexCase1Test() {
             var flags = new bool[3];
 
--- a/Implab/IProgressNotifier.cs	Tue Nov 05 19:55:34 2013 +0400
+++ b/Implab/IProgressNotifier.cs	Wed Nov 06 01:07:55 2013 +0400
@@ -9,6 +9,6 @@
     {
         event EventHandler<ValueEventArgs<string>> MessageUpdated;
         event EventHandler<ValueEventArgs<float>> ProgressUpdated;
-        EventHandler<ProgressInitEventArgs> ProgressInit;
+        event EventHandler<ProgressInitEventArgs> ProgressInit;
     }
 }
--- a/Implab/ManagedPromise.cs	Tue Nov 05 19:55:34 2013 +0400
+++ b/Implab/ManagedPromise.cs	Wed Nov 06 01:07:55 2013 +0400
@@ -5,7 +5,7 @@
 
 namespace Implab {
 
-    public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
+    /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
         
-    }
+    }*/
 }
--- a/Implab/Parallels/WorkerPool.cs	Tue Nov 05 19:55:34 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Wed Nov 06 01:07:55 2013 +0400
@@ -13,21 +13,50 @@
         object m_lock = new object();
 
         bool m_disposed = false;
+
+        // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
         ManualResetEvent m_hasTasks = new ManualResetEvent(false);
         Queue<Action> m_queue = new Queue<Action>();
 
         public WorkerPool(int min, int max) {
             if (min < 0)
                 throw new ArgumentOutOfRangeException("min");
+            if (max <= 0)
+                throw new ArgumentOutOfRangeException("max");
+
             if (min > max)
                 min = max;
             m_minThreads = min;
             m_maxThreads = max;
 
+            InitPool();
+        }
+
+        public WorkerPool(int max)
+            : this(0, max) {
+        }
+
+        public WorkerPool() {
+            int maxThreads, maxCP;
+            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
+
+            m_minThreads = 0;
+            m_maxThreads = maxThreads;
+
+            InitPool();
+        }
+
+        void InitPool() {
             for (int i = 0; i < m_minThreads; i++)
                 StartWorker();
         }
 
+        public int ThreadCount {
+            get {
+                return m_runningThreads;
+            }
+        }
+
         public Promise<T> Invoke<T>(Func<T> task) {
             if (m_disposed)
                 throw new ObjectDisposedException(ToString());
@@ -36,7 +65,16 @@
 
             var promise = new Promise<T>();
 
+            var queueLen = EnqueueTask(delegate() {
+                try {
+                    promise.Resolve(task());
+                } catch (Exception e) {
+                    promise.Reject(e);
+                }
+            });
 
+            if (queueLen > 1)
+                StartWorker();
 
             return promise;
         }
@@ -53,16 +91,18 @@
             // slot successfully allocated
 
             var worker = new Thread(this.Worker);
+            worker.IsBackground = true;
             worker.Start();
 
             return true;
         }
 
-        void EnqueueTask(Action task) {
+        int EnqueueTask(Action task) {
             Debug.Assert(task != null);
             lock (m_queue) {
                 m_queue.Enqueue(task);
                 m_hasTasks.Set();
+                return m_queue.Count;
             }
         }
 
@@ -83,7 +123,7 @@
                     }
 
                     // no tasks left
-                    // signal that no more tasks left, lock ensures that this event won't suppress newly added task
+                    // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
                     m_hasTasks.Reset();
                 }