Mercurial > pub > ImplabNet
changeset 13:b0feb5b9ad1c promises
small fixes, WorkerPool still incomplete
author | cin |
---|---|
date | Wed, 06 Nov 2013 01:07:55 +0400 |
parents | eb418ba8275b |
children | e943453e5039 |
files | Implab.Test/AsyncTests.cs Implab/IProgressNotifier.cs Implab/ManagedPromise.cs Implab/Parallels/WorkerPool.cs |
diffstat | 4 files changed, 75 insertions(+), 5 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Tue Nov 05 19:55:34 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Wed Nov 06 01:07:55 2013 +0400 @@ -107,6 +107,36 @@ } [TestMethod] + public void WorkerPoolSizeTest() { + var pool = new WorkerPool(5,10); + + Assert.AreEqual(5, pool.ThreadCount); + + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + + Assert.AreEqual(5, pool.ThreadCount); + + for (int i = 0; i < 100; i++) + pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + Assert.AreEqual(10, pool.ThreadCount); + } + + [TestMethod] + public void WorkerPoolCorrectTest() { + var pool = new WorkerPool(5, 20); + + var count = 0; + for (int i = 0; i < 1000; i++) + pool + .Invoke(() => 1) + .Then(x => Interlocked.Add(ref count, x)); + + Assert.AreEqual(1000, count); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3];
--- a/Implab/IProgressNotifier.cs Tue Nov 05 19:55:34 2013 +0400 +++ b/Implab/IProgressNotifier.cs Wed Nov 06 01:07:55 2013 +0400 @@ -9,6 +9,6 @@ { event EventHandler<ValueEventArgs<string>> MessageUpdated; event EventHandler<ValueEventArgs<float>> ProgressUpdated; - EventHandler<ProgressInitEventArgs> ProgressInit; + event EventHandler<ProgressInitEventArgs> ProgressInit; } }
--- a/Implab/ManagedPromise.cs Tue Nov 05 19:55:34 2013 +0400 +++ b/Implab/ManagedPromise.cs Wed Nov 06 01:07:55 2013 +0400 @@ -5,7 +5,7 @@ namespace Implab { - public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier { + /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier { - } + }*/ }
--- a/Implab/Parallels/WorkerPool.cs Tue Nov 05 19:55:34 2013 +0400 +++ b/Implab/Parallels/WorkerPool.cs Wed Nov 06 01:07:55 2013 +0400 @@ -13,21 +13,50 @@ object m_lock = new object(); bool m_disposed = false; + + // this event will signal that workers can try to fetch a task from queue or the pool has been disposed ManualResetEvent m_hasTasks = new ManualResetEvent(false); Queue<Action> m_queue = new Queue<Action>(); public WorkerPool(int min, int max) { if (min < 0) throw new ArgumentOutOfRangeException("min"); + if (max <= 0) + throw new ArgumentOutOfRangeException("max"); + if (min > max) min = max; m_minThreads = min; m_maxThreads = max; + InitPool(); + } + + public WorkerPool(int max) + : this(0, max) { + } + + public WorkerPool() { + int maxThreads, maxCP; + ThreadPool.GetMaxThreads(out maxThreads, out maxCP); + + m_minThreads = 0; + m_maxThreads = maxThreads; + + InitPool(); + } + + void InitPool() { for (int i = 0; i < m_minThreads; i++) StartWorker(); } + public int ThreadCount { + get { + return m_runningThreads; + } + } + public Promise<T> Invoke<T>(Func<T> task) { if (m_disposed) throw new ObjectDisposedException(ToString()); @@ -36,7 +65,16 @@ var promise = new Promise<T>(); + var queueLen = EnqueueTask(delegate() { + try { + promise.Resolve(task()); + } catch (Exception e) { + promise.Reject(e); + } + }); + if (queueLen > 1) + StartWorker(); return promise; } @@ -53,16 +91,18 @@ // slot successfully allocated var worker = new Thread(this.Worker); + worker.IsBackground = true; worker.Start(); return true; } - void EnqueueTask(Action task) { + int EnqueueTask(Action task) { Debug.Assert(task != null); lock (m_queue) { m_queue.Enqueue(task); m_hasTasks.Set(); + return m_queue.Count; } } @@ -83,7 +123,7 @@ } // no tasks left - // signal that no more tasks left, lock ensures that this event won't suppress newly added task + // signal that no more tasks left, current lock ensures that this event won't suppress newly added task m_hasTasks.Reset(); }