changeset 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400 (2013-11-06)
parents e943453e5039
children 5a4b735ba669
files Implab.Test/AsyncTests.cs Implab.v11.suo Implab/Implab.csproj Implab/Parallels/ArrayTraits.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/WorkerPool.cs Implab/Promise.cs
diffstat 7 files changed, 506 insertions(+), 221 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Wed Nov 06 17:49:12 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Thu Nov 07 03:41:32 2013 +0400
@@ -4,71 +4,64 @@
 using System.Threading;
 using Implab.Parallels;
 
-namespace Implab.Test
-{
-	[TestClass]
-	public class AsyncTests
-	{
-		[TestMethod]
-		public void ResolveTest ()
-		{
-			int res = -1;
-			var p = new Promise<int> ();
-			p.Then (x => res = x);
-			p.Resolve (100);
+namespace Implab.Test {
+    [TestClass]
+    public class AsyncTests {
+        [TestMethod]
+        public void ResolveTest() {
+            int res = -1;
+            var p = new Promise<int>();
+            p.Then(x => res = x);
+            p.Resolve(100);
 
-			Assert.AreEqual (res, 100);
-		}
+            Assert.AreEqual(res, 100);
+        }
 
         [TestMethod]
-		public void RejectTest ()
-		{
-			int res = -1;
-			Exception err = null;
+        public void RejectTest() {
+            int res = -1;
+            Exception err = null;
 
-			var p = new Promise<int> ();
-			p.Then (x => res = x, e => err = e);
-			p.Reject (new ApplicationException ("error"));
+            var p = new Promise<int>();
+            p.Then(x => res = x, e => err = e);
+            p.Reject(new ApplicationException("error"));
 
-			Assert.AreEqual (res, -1);
-			Assert.AreEqual (err.Message, "error");
+            Assert.AreEqual(res, -1);
+            Assert.AreEqual(err.Message, "error");
 
-		}
+        }
 
         [TestMethod]
-		public void JoinSuccessTest ()
-		{
-			var p = new Promise<int> ();
-			p.Resolve (100);
-			Assert.AreEqual (p.Join (), 100);
-		}
+        public void JoinSuccessTest() {
+            var p = new Promise<int>();
+            p.Resolve(100);
+            Assert.AreEqual(p.Join(), 100);
+        }
 
         [TestMethod]
-		public void JoinFailTest ()
-		{
-			var p = new Promise<int> ();
-			p.Reject (new ApplicationException ("failed"));
+        public void JoinFailTest() {
+            var p = new Promise<int>();
+            p.Reject(new ApplicationException("failed"));
 
-			try {
-				p.Join ();
-				throw new ApplicationException ("WRONG!");
-			} catch (TargetInvocationException err) {
-				Assert.AreEqual (err.InnerException.Message, "failed");
-			} catch {
-				Assert.Fail ("Got wrong excaption");
-			}
-		}
+            try {
+                p.Join();
+                throw new ApplicationException("WRONG!");
+            } catch (TargetInvocationException err) {
+                Assert.AreEqual(err.InnerException.Message, "failed");
+            } catch {
+                Assert.Fail("Got wrong excaption");
+            }
+        }
 
         [TestMethod]
-		public void MapTest ()
-		{
-			var p = new Promise<int> ();
+        public void MapTest() {
+            var p = new Promise<int>();
 
-			var p2 = p.Map (x => x.ToString ());
-			p.Resolve (100);
+            var p2 = p.Map(x => x.ToString());
+            p.Resolve(100);
 
-			Assert.AreEqual (p2.Join (), "100");
-		}
+            Assert.AreEqual(p2.Join(), "100");
+        }
 
         [TestMethod]
         public void FixErrorTest() {
@@ -82,65 +75,90 @@
         }
 
         [TestMethod]
-		public void ChainTest ()
-		{
-			var p1 = new Promise<int> ();
+        public void ChainTest() {
+            var p1 = new Promise<int>();
 
-			var p3 = p1.Chain (x => {
-				var p2 = new Promise<string> ();
-				p2.Resolve (x.ToString ());
-				return p2;
-			});
+            var p3 = p1.Chain(x => {
+                var p2 = new Promise<string>();
+                p2.Resolve(x.ToString());
+                return p2;
+            });
 
-			p1.Resolve (100);
+            p1.Resolve(100);
 
-			Assert.AreEqual (p3.Join (), "100");
-		}
+            Assert.AreEqual(p3.Join(), "100");
+        }
 
         [TestMethod]
-		public void PoolTest ()
-		{
-			var pid = Thread.CurrentThread.ManagedThreadId;
-			var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
+        public void PoolTest() {
+            var pid = Thread.CurrentThread.ManagedThreadId;
+            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
 
-			Assert.AreNotEqual (pid, p.Join ());
-		}
+            Assert.AreNotEqual(pid, p.Join());
+        }
 
         [TestMethod]
         public void WorkerPoolSizeTest() {
-            var pool = new WorkerPool(5,10);
+            var pool = new WorkerPool(5, 10);
 
             Assert.AreEqual(5, pool.ThreadCount);
 
-            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
 
             Assert.AreEqual(5, pool.ThreadCount);
 
             for (int i = 0; i < 100; i++)
-                pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+                pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            Thread.Sleep(100);
             Assert.AreEqual(10, pool.ThreadCount);
+
+            pool.Dispose();
         }
 
         [TestMethod]
         public void WorkerPoolCorrectTest() {
-            var pool = new WorkerPool(5, 20);
+            var pool = new WorkerPool();
+
+            int iterations = 1000;
+            int pending = iterations;
+            var stop = new ManualResetEvent(false);
 
             var count = 0;
-            for (int i = 0; i < 1000; i++)
+            for (int i = 0; i < iterations; i++) {
                 pool
                     .Invoke(() => 1)
-                    .Then(x => Interlocked.Add(ref count, x));
+                    .Then(x => Interlocked.Add(ref count, x))
+                    .Then(x => Math.Log10(x))
+                    .Anyway(() => {
+                        Interlocked.Decrement(ref pending);
+                        if (pending == 0)
+                            stop.Set();
+                    });
+            }
+
+            stop.WaitOne();
 
-            Assert.AreEqual(1000, count);
+            Assert.AreEqual(iterations, count);
+            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
+            pool.Dispose();
+            
+        }
+
+        [TestMethod]
+        public void WorkerPoolDisposeTest() {
+            var pool = new WorkerPool(5, 20);
+            Assert.AreEqual(5, pool.ThreadCount);
+            pool.Dispose();
+            Thread.Sleep(100);
+            Assert.AreEqual(0, pool.ThreadCount);
+            pool.Dispose();
         }
 
         [TestMethod]
         public void MTQueueTest() {
             var queue = new MTQueue<int>();
-            var pool = new WorkerPool(5, 20);
-
             int res;
 
             queue.Enqueue(10);
@@ -169,33 +187,27 @@
                 var wn = i;
                 AsyncPool
                     .InvokeNewThread(() => {
-                        Console.WriteLine("Started writer: {0}", wn);
                         for (int ii = 0; ii < itemsPerWriter; ii++) {
                             queue.Enqueue(1);
-                            Thread.Sleep(1);
                         }
-                        Console.WriteLine("Stopped writer: {0}", wn);
                         return 1;
                     })
-                    .Then(x => Interlocked.Decrement(ref writers) );
+                    .Anyway(() => Interlocked.Decrement(ref writers));
             }
-            
+
             for (int i = 0; i < 10; i++) {
                 Interlocked.Increment(ref readers);
                 var wn = i;
                 AsyncPool
                     .InvokeNewThread(() => {
                         int t;
-                        Console.WriteLine("Started reader: {0}", wn);
                         do {
                             while (queue.TryDequeue(out t))
                                 Interlocked.Add(ref total, t);
-                            Thread.Sleep(0);
                         } while (writers > 0);
-                        Console.WriteLine("Stopped reader: {0}", wn);
                         return 1;
                     })
-                    .Then(x => {
+                    .Anyway(() => {
                         Interlocked.Decrement(ref readers);
                         if (readers == 0)
                             stop.Set();
@@ -208,6 +220,55 @@
         }
 
         [TestMethod]
+        public void ParallelMapTest() {
+
+            int count = 100000;
+
+            double[] args = new double[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = rand.NextDouble();
+
+            var t = Environment.TickCount;
+            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
+
+            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
+
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
+
+        [TestMethod]
+        public void ParallelForEachTest() {
+
+            int count = 100000;
+
+            int[] args = new int[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = (int)(rand.NextDouble() * 100);
+
+            int result = 0;
+
+            var t = Environment.TickCount;
+            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
+
+            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
+
+            int result2 = 0;
+
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                result2 += args[i];
+            Assert.AreEqual(result2, result);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
+
+        [TestMethod]
         public void ComplexCase1Test() {
             var flags = new bool[3];
 
@@ -219,7 +280,7 @@
                 .Chain(x =>
                     PromiseHelper
                         .Sleep(200, "Hi, " + x)
-                        .Map( y => y )
+                        .Map(y => y)
                         .Cancelled(() => flags[1] = true)
                 )
                 .Cancelled(() => flags[2] = true);
@@ -228,13 +289,13 @@
             try {
                 Assert.AreEqual(p.Join(), "Hi, Alan");
                 Assert.Fail("Shouldn't get here");
-            } catch(OperationCanceledException) {
+            } catch (OperationCanceledException) {
             }
 
             Assert.IsFalse(flags[0]);
             Assert.IsTrue(flags[1]);
             Assert.IsTrue(flags[2]);
         }
-	}
+    }
 }
 
Binary file Implab.v11.suo has changed
--- a/Implab/Implab.csproj	Wed Nov 06 17:49:12 2013 +0400
+++ b/Implab/Implab.csproj	Thu Nov 07 03:41:32 2013 +0400
@@ -38,6 +38,8 @@
     <Compile Include="IPromise.cs" />
     <Compile Include="ITaskController.cs" />
     <Compile Include="ManagedPromise.cs" />
+    <Compile Include="Parallels\DispatchPool.cs" />
+    <Compile Include="Parallels\ArrayTraits.cs" />
     <Compile Include="Parallels\MTQueue.cs" />
     <Compile Include="Parallels\WorkerPool.cs" />
     <Compile Include="PromiseState.cs" />
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/ArrayTraits.cs	Thu Nov 07 03:41:32 2013 +0400
@@ -0,0 +1,152 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Parallels {
+    public static class ArrayTraits {
+        class ArrayIterator<TSrc> : DispatchPool<int> {
+            readonly Action<TSrc> m_action;
+            readonly TSrc[] m_source;
+            readonly Promise<int> m_promise = new Promise<int>();
+
+            int m_pending;
+            int m_next;
+
+            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
+                : base(threads) {
+
+                Debug.Assert(source != null);
+                Debug.Assert(action != null);
+
+                m_next = 0;
+                m_source = source;
+                m_pending = source.Length;
+                m_action = action;
+
+                m_promise.Anyway(() => Dispose());
+                m_promise.Cancelled(() => Dispose());
+
+                InitPool();
+            }
+
+            public Promise<int> Promise {
+                get {
+                    return m_promise;
+                }
+            }
+
+            protected override bool TryDequeue(out int unit) {
+                int index;
+                unit = -1;
+                do {
+                    index = m_next;
+                    if (index >= m_source.Length)
+                        return false;
+                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
+
+                unit = index;
+                return true;
+            }
+
+            protected override void InvokeUnit(int unit) {
+                try {
+                    m_action(m_source[unit]);
+                    int pending;
+                    do {
+                        pending = m_pending;
+                    } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
+                    pending--;
+                    if (pending == 0)
+                        m_promise.Resolve(m_source.Length);
+                } catch (Exception e) {
+                    m_promise.Reject(e);
+                }
+            }
+        }
+
+        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
+            readonly Func<TSrc, TDst> m_transform;
+            readonly TSrc[] m_source;
+            readonly TDst[] m_dest;
+            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
+
+            int m_pending;
+            int m_next;
+
+            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
+                : base(threads) {
+
+                Debug.Assert (source != null);
+                Debug.Assert( transform != null);
+
+                m_next = 0;
+                m_source = source;
+                m_dest = new TDst[source.Length];
+                m_pending = source.Length;
+                m_transform = transform;
+
+                m_promise.Anyway(() => Dispose());
+                m_promise.Cancelled(() => Dispose());
+
+                InitPool();
+            }
+
+            public Promise<TDst[]> Promise {
+                get {
+                    return m_promise;
+                }
+            }
+
+            protected override bool TryDequeue(out int unit) {
+                int index;
+                unit = -1;
+                do {
+                    index = m_next;
+                    if (index >= m_source.Length)
+                        return false;
+                } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
+
+                unit = index;
+                return true;
+            }
+
+            protected override void InvokeUnit(int unit) {
+                try {
+                    m_dest[unit] = m_transform(m_source[unit]);
+                    int pending;
+                    do {
+                        pending = m_pending;
+                    } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
+                    pending --;
+                    if (pending == 0)
+                        m_promise.Resolve(m_dest);
+                } catch (Exception e) {
+                    m_promise.Reject(e);
+                }
+            }
+        }
+
+        public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+
+            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
+            return mapper.Promise;
+        }
+
+        public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (action == null)
+                throw new ArgumentNullException("action");
+
+            var iter = new ArrayIterator<TSrc>(source, action, threads);
+            return iter.Promise;
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/DispatchPool.cs	Thu Nov 07 03:41:32 2013 +0400
@@ -0,0 +1,171 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Diagnostics;
+
+namespace Implab.Parallels {
+    public abstract class DispatchPool<TUnit> : IDisposable {
+        readonly int m_minThreads;
+        readonly int m_maxThreads;
+        int m_runningThreads = 0;
+        int m_maxRunningThreads = 0;
+        int m_suspended = 0;
+        int m_exitRequired = 0;
+        AutoResetEvent m_hasTasks = new AutoResetEvent(false);
+
+        protected DispatchPool(int min, int max) {
+            if (min < 0)
+                throw new ArgumentOutOfRangeException("min");
+            if (max <= 0)
+                throw new ArgumentOutOfRangeException("max");
+
+            if (min > max)
+                min = max;
+            m_minThreads = min;
+            m_maxThreads = max;
+        }
+
+        protected DispatchPool(int threads)
+            : this(threads, threads) {
+        }
+
+        protected DispatchPool() {
+            int maxThreads, maxCP;
+            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
+
+            m_minThreads = 0;
+            m_maxThreads = maxThreads;
+        }
+
+        protected void InitPool() {
+            for (int i = 0; i < m_minThreads; i++)
+                StartWorker();
+        }
+
+        public int ThreadCount {
+            get {
+                return m_runningThreads;
+            }
+        }
+
+        public int MaxRunningThreads {
+            get {
+                return m_maxRunningThreads;
+            }
+        }
+
+        protected bool IsDisposed {
+            get {
+                return m_exitRequired != 0;
+            }
+        }
+
+        bool StartWorker() {
+            var current = m_runningThreads;
+            // use spins to allocate slot for the new thread
+            do {
+                if (current >= m_maxThreads || m_exitRequired != 0)
+                    // no more slots left or the pool has been disposed
+                    return false;
+            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
+
+            m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1);
+
+            // slot successfully allocated
+
+            var worker = new Thread(this.Worker);
+            worker.IsBackground = true;
+            worker.Start();
+
+            return true;
+        }
+
+        protected abstract bool TryDequeue(out TUnit unit);
+
+        protected virtual void WakeNewWorker() {
+            if (m_suspended > 0)
+                m_hasTasks.Set();
+            else
+                StartWorker();
+        }
+
+        bool FetchTask(out TUnit unit) {
+            do {
+                // exit if requested
+                if (m_exitRequired != 0) {
+                    // release the thread slot
+                    int running;
+                    do {
+                        running = m_runningThreads;
+                    } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running));
+                    running--;
+
+                    if (running == 0) // it was the last worker
+                        m_hasTasks.Dispose();
+                    else
+                        m_hasTasks.Set(); // release next worker
+                    unit = default(TUnit);
+                    return false;
+                }
+
+                // fetch task
+                if (TryDequeue(out unit)) {
+                    WakeNewWorker();
+                    return true;
+                }
+
+                //no tasks left, exit if the thread is no longer needed
+                int runningThreads;
+                bool exit = true;
+                do {
+                    runningThreads = m_runningThreads;
+                    if (runningThreads <= m_minThreads) {
+                        exit = false;
+                        break;
+                    }
+                } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads));
+
+                if (exit) {
+                    Interlocked.Decrement(ref m_runningThreads);
+                    return false;
+                }
+
+                // keep this thread and wait
+                Interlocked.Increment(ref m_suspended);
+                m_hasTasks.WaitOne();
+                Interlocked.Decrement(ref m_suspended);
+            } while (true);
+        }
+
+        protected abstract void InvokeUnit(TUnit unit);
+
+        void Worker() {
+            TUnit unit;
+            while (FetchTask(out unit))
+                InvokeUnit(unit);
+        }
+
+        protected virtual void Dispose(bool disposing) {
+            if (disposing) {
+                if (m_exitRequired == 0) {
+                    if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
+                        return;
+
+                    // wake sleeping threads
+                    m_hasTasks.Set();
+                    GC.SuppressFinalize(this);
+                }
+            }
+        }
+
+        public void Dispose() {
+            Dispose(true);
+        }
+
+        ~DispatchPool() {
+            Dispose(false);
+        }
+    }
+}
--- a/Implab/Parallels/WorkerPool.cs	Wed Nov 06 17:49:12 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Thu Nov 07 03:41:32 2013 +0400
@@ -6,66 +6,35 @@
 using System.Diagnostics;
 
 namespace Implab.Parallels {
-    public class WorkerPool : IDisposable {
-        readonly int m_minThreads;
-        readonly int m_maxThreads;
-        int m_runningThreads;
-        object m_lock = new object();
-
-        bool m_disposed = false;
-
-        // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
-        ManualResetEvent m_hasTasks = new ManualResetEvent(false);
-        Queue<Action> m_queue = new Queue<Action>();
+    public class WorkerPool : DispatchPool<Action> {
 
-        public WorkerPool(int min, int max) {
-            if (min < 0)
-                throw new ArgumentOutOfRangeException("min");
-            if (max <= 0)
-                throw new ArgumentOutOfRangeException("max");
+        MTQueue<Action> m_queue = new MTQueue<Action>();
+        int m_queueLength = 0;
 
-            if (min > max)
-                min = max;
-            m_minThreads = min;
-            m_maxThreads = max;
-
-            InitPool();
+        public WorkerPool(int minThreads, int maxThreads)
+            : base(minThreads, maxThreads) {
+                InitPool();
         }
 
-        public WorkerPool(int max)
-            : this(0, max) {
+        public WorkerPool(int threads)
+            : base(threads) {
+                InitPool();
         }
 
-        public WorkerPool() {
-            int maxThreads, maxCP;
-            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
-
-            m_minThreads = 0;
-            m_maxThreads = maxThreads;
-
-            InitPool();
-        }
-
-        void InitPool() {
-            for (int i = 0; i < m_minThreads; i++)
-                StartWorker();
-        }
-
-        public int ThreadCount {
-            get {
-                return m_runningThreads;
-            }
+        public WorkerPool()
+            : base() {
+                InitPool();
         }
 
         public Promise<T> Invoke<T>(Func<T> task) {
-            if (m_disposed)
-                throw new ObjectDisposedException(ToString());
             if (task == null)
                 throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
 
             var promise = new Promise<T>();
 
-            var queueLen = EnqueueTask(delegate() {
+            EnqueueTask(delegate() {
                 try {
                     promise.Resolve(task());
                 } catch (Exception e) {
@@ -73,99 +42,28 @@
                 }
             });
 
-            if (queueLen > 1)
-                StartWorker();
-
             return promise;
         }
 
-        bool StartWorker() {
-            var current = m_runningThreads;
-            // use spins to allocate slot for the new thread
-            do {
-                if (current >= m_maxThreads)
-                    // no more slots left
-                    return false;
-            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
-
-            // slot successfully allocated
-
-            var worker = new Thread(this.Worker);
-            worker.IsBackground = true;
-            worker.Start();
-
-            return true;
-        }
-
-        int EnqueueTask(Action task) {
-            Debug.Assert(task != null);
-            lock (m_queue) {
-                m_queue.Enqueue(task);
-                m_hasTasks.Set();
-                return m_queue.Count;
-            }
+        protected void EnqueueTask(Action unit) {
+            Debug.Assert(unit != null);
+            Interlocked.Increment(ref m_queueLength);
+            m_queue.Enqueue(unit);
+            // if there are sleeping threads in the pool wake one
+            // probably this will lead a dry run
+            WakeNewWorker();
         }
 
-        bool FetchTask(out Action task) {
-            task = null;
-
-            while (true) {
-
-                m_hasTasks.WaitOne();
-
-                if (m_disposed)
-                    return false;
-
-                lock (m_queue) {
-                    if (m_queue.Count > 0) {
-                        task = m_queue.Dequeue();
-                        return true;
-                    }
-
-                    // no tasks left
-                    // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
-                    m_hasTasks.Reset();
-                }
-                
-                bool exit = true;
-
-                var current = m_runningThreads;
-                do {
-                    if (current <= m_minThreads) {
-                        exit = false; // this thread should return and wait for the new events
-                        break;
-                    }
-                } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
-
-                if (exit)
-                    return false;
+        protected override bool TryDequeue(out Action unit) {
+            if (m_queue.TryDequeue(out unit)) {
+                Interlocked.Decrement(ref m_queueLength);
+                return true;
             }
+            return false;
         }
 
-        void Worker() {
-            Action task;
-            while (FetchTask(out task))
-                task();
-        }
-
-        protected virtual void Dispose(bool disposing) {
-            if (disposing) {
-                lock (m_lock) {
-                    if (m_disposed)
-                        return;
-                    m_disposed = true;
-                }
-                m_hasTasks.Set();
-                GC.SuppressFinalize(this);
-            }
-        }
-
-        public void Dispose() {
-            Dispose(true);
-        }
-
-        ~WorkerPool() {
-            Dispose(false);
+        protected override void InvokeUnit(Action unit) {
+            unit();
         }
     }
 }
--- a/Implab/Promise.cs	Wed Nov 06 17:49:12 2013 +0400
+++ b/Implab/Promise.cs	Thu Nov 07 03:41:32 2013 +0400
@@ -539,5 +539,6 @@
 
             return result;
         }
+
     }
 }