Mercurial > pub > ImplabNet
changeset 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | e943453e5039 |
children | 5a4b735ba669 |
files | Implab.Test/AsyncTests.cs Implab.v11.suo Implab/Implab.csproj Implab/Parallels/ArrayTraits.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/WorkerPool.cs Implab/Promise.cs |
diffstat | 7 files changed, 506 insertions(+), 221 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Wed Nov 06 17:49:12 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Thu Nov 07 03:41:32 2013 +0400 @@ -4,71 +4,64 @@ using System.Threading; using Implab.Parallels; -namespace Implab.Test -{ - [TestClass] - public class AsyncTests - { - [TestMethod] - public void ResolveTest () - { - int res = -1; - var p = new Promise<int> (); - p.Then (x => res = x); - p.Resolve (100); +namespace Implab.Test { + [TestClass] + public class AsyncTests { + [TestMethod] + public void ResolveTest() { + int res = -1; + var p = new Promise<int>(); + p.Then(x => res = x); + p.Resolve(100); - Assert.AreEqual (res, 100); - } + Assert.AreEqual(res, 100); + } [TestMethod] - public void RejectTest () - { - int res = -1; - Exception err = null; + public void RejectTest() { + int res = -1; + Exception err = null; - var p = new Promise<int> (); - p.Then (x => res = x, e => err = e); - p.Reject (new ApplicationException ("error")); + var p = new Promise<int>(); + p.Then(x => res = x, e => err = e); + p.Reject(new ApplicationException("error")); - Assert.AreEqual (res, -1); - Assert.AreEqual (err.Message, "error"); + Assert.AreEqual(res, -1); + Assert.AreEqual(err.Message, "error"); - } + } [TestMethod] - public void JoinSuccessTest () - { - var p = new Promise<int> (); - p.Resolve (100); - Assert.AreEqual (p.Join (), 100); - } + public void JoinSuccessTest() { + var p = new Promise<int>(); + p.Resolve(100); + Assert.AreEqual(p.Join(), 100); + } [TestMethod] - public void JoinFailTest () - { - var p = new Promise<int> (); - p.Reject (new ApplicationException ("failed")); + public void JoinFailTest() { + var p = new Promise<int>(); + p.Reject(new ApplicationException("failed")); - try { - p.Join (); - throw new ApplicationException ("WRONG!"); - } catch (TargetInvocationException err) { - Assert.AreEqual (err.InnerException.Message, "failed"); - } catch { - Assert.Fail ("Got wrong excaption"); - } - } + try { + p.Join(); + throw new ApplicationException("WRONG!"); + } catch (TargetInvocationException err) { + Assert.AreEqual(err.InnerException.Message, "failed"); + } catch { + Assert.Fail("Got wrong excaption"); + } + } [TestMethod] - public void MapTest () - { - var p = new Promise<int> (); + public void MapTest() { + var p = new Promise<int>(); - var p2 = p.Map (x => x.ToString ()); - p.Resolve (100); + var p2 = p.Map(x => x.ToString()); + p.Resolve(100); - Assert.AreEqual (p2.Join (), "100"); - } + Assert.AreEqual(p2.Join(), "100"); + } [TestMethod] public void FixErrorTest() { @@ -82,65 +75,90 @@ } [TestMethod] - public void ChainTest () - { - var p1 = new Promise<int> (); + public void ChainTest() { + var p1 = new Promise<int>(); - var p3 = p1.Chain (x => { - var p2 = new Promise<string> (); - p2.Resolve (x.ToString ()); - return p2; - }); + var p3 = p1.Chain(x => { + var p2 = new Promise<string>(); + p2.Resolve(x.ToString()); + return p2; + }); - p1.Resolve (100); + p1.Resolve(100); - Assert.AreEqual (p3.Join (), "100"); - } + Assert.AreEqual(p3.Join(), "100"); + } [TestMethod] - public void PoolTest () - { - var pid = Thread.CurrentThread.ManagedThreadId; - var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId); + public void PoolTest() { + var pid = Thread.CurrentThread.ManagedThreadId; + var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); - Assert.AreNotEqual (pid, p.Join ()); - } + Assert.AreNotEqual(pid, p.Join()); + } [TestMethod] public void WorkerPoolSizeTest() { - var pool = new WorkerPool(5,10); + var pool = new WorkerPool(5, 10); Assert.AreEqual(5, pool.ThreadCount); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); Assert.AreEqual(5, pool.ThreadCount); for (int i = 0; i < 100; i++) - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + Thread.Sleep(100); Assert.AreEqual(10, pool.ThreadCount); + + pool.Dispose(); } [TestMethod] public void WorkerPoolCorrectTest() { - var pool = new WorkerPool(5, 20); + var pool = new WorkerPool(); + + int iterations = 1000; + int pending = iterations; + var stop = new ManualResetEvent(false); var count = 0; - for (int i = 0; i < 1000; i++) + for (int i = 0; i < iterations; i++) { pool .Invoke(() => 1) - .Then(x => Interlocked.Add(ref count, x)); + .Then(x => Interlocked.Add(ref count, x)) + .Then(x => Math.Log10(x)) + .Anyway(() => { + Interlocked.Decrement(ref pending); + if (pending == 0) + stop.Set(); + }); + } + + stop.WaitOne(); - Assert.AreEqual(1000, count); + Assert.AreEqual(iterations, count); + Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); + pool.Dispose(); + + } + + [TestMethod] + public void WorkerPoolDisposeTest() { + var pool = new WorkerPool(5, 20); + Assert.AreEqual(5, pool.ThreadCount); + pool.Dispose(); + Thread.Sleep(100); + Assert.AreEqual(0, pool.ThreadCount); + pool.Dispose(); } [TestMethod] public void MTQueueTest() { var queue = new MTQueue<int>(); - var pool = new WorkerPool(5, 20); - int res; queue.Enqueue(10); @@ -169,33 +187,27 @@ var wn = i; AsyncPool .InvokeNewThread(() => { - Console.WriteLine("Started writer: {0}", wn); for (int ii = 0; ii < itemsPerWriter; ii++) { queue.Enqueue(1); - Thread.Sleep(1); } - Console.WriteLine("Stopped writer: {0}", wn); return 1; }) - .Then(x => Interlocked.Decrement(ref writers) ); + .Anyway(() => Interlocked.Decrement(ref writers)); } - + for (int i = 0; i < 10; i++) { Interlocked.Increment(ref readers); var wn = i; AsyncPool .InvokeNewThread(() => { int t; - Console.WriteLine("Started reader: {0}", wn); do { while (queue.TryDequeue(out t)) Interlocked.Add(ref total, t); - Thread.Sleep(0); } while (writers > 0); - Console.WriteLine("Stopped reader: {0}", wn); return 1; }) - .Then(x => { + .Anyway(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); @@ -208,6 +220,55 @@ } [TestMethod] + public void ParallelMapTest() { + + int count = 100000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); + + var t = Environment.TickCount; + var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); + + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } + + [TestMethod] + public void ParallelForEachTest() { + + int count = 100000; + + int[] args = new int[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = (int)(rand.NextDouble() * 100); + + int result = 0; + + var t = Environment.TickCount; + args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); + + Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); + + int result2 = 0; + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + result2 += args[i]; + Assert.AreEqual(result2, result); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; @@ -219,7 +280,7 @@ .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) - .Map( y => y ) + .Map(y => y) .Cancelled(() => flags[1] = true) ) .Cancelled(() => flags[2] = true); @@ -228,13 +289,13 @@ try { Assert.AreEqual(p.Join(), "Hi, Alan"); Assert.Fail("Shouldn't get here"); - } catch(OperationCanceledException) { + } catch (OperationCanceledException) { } Assert.IsFalse(flags[0]); Assert.IsTrue(flags[1]); Assert.IsTrue(flags[2]); } - } + } }
--- a/Implab/Implab.csproj Wed Nov 06 17:49:12 2013 +0400 +++ b/Implab/Implab.csproj Thu Nov 07 03:41:32 2013 +0400 @@ -38,6 +38,8 @@ <Compile Include="IPromise.cs" /> <Compile Include="ITaskController.cs" /> <Compile Include="ManagedPromise.cs" /> + <Compile Include="Parallels\DispatchPool.cs" /> + <Compile Include="Parallels\ArrayTraits.cs" /> <Compile Include="Parallels\MTQueue.cs" /> <Compile Include="Parallels\WorkerPool.cs" /> <Compile Include="PromiseState.cs" />
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/ArrayTraits.cs Thu Nov 07 03:41:32 2013 +0400 @@ -0,0 +1,152 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public static class ArrayTraits { + class ArrayIterator<TSrc> : DispatchPool<int> { + readonly Action<TSrc> m_action; + readonly TSrc[] m_source; + readonly Promise<int> m_promise = new Promise<int>(); + + int m_pending; + int m_next; + + public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) + : base(threads) { + + Debug.Assert(source != null); + Debug.Assert(action != null); + + m_next = 0; + m_source = source; + m_pending = source.Length; + m_action = action; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise<int> Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + int index; + unit = -1; + do { + index = m_next; + if (index >= m_source.Length) + return false; + } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); + + unit = index; + return true; + } + + protected override void InvokeUnit(int unit) { + try { + m_action(m_source[unit]); + int pending; + do { + pending = m_pending; + } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); + pending--; + if (pending == 0) + m_promise.Resolve(m_source.Length); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + class ArrayMapper<TSrc, TDst>: DispatchPool<int> { + readonly Func<TSrc, TDst> m_transform; + readonly TSrc[] m_source; + readonly TDst[] m_dest; + readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); + + int m_pending; + int m_next; + + public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) + : base(threads) { + + Debug.Assert (source != null); + Debug.Assert( transform != null); + + m_next = 0; + m_source = source; + m_dest = new TDst[source.Length]; + m_pending = source.Length; + m_transform = transform; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise<TDst[]> Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + int index; + unit = -1; + do { + index = m_next; + if (index >= m_source.Length) + return false; + } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); + + unit = index; + return true; + } + + protected override void InvokeUnit(int unit) { + try { + m_dest[unit] = m_transform(m_source[unit]); + int pending; + do { + pending = m_pending; + } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); + pending --; + if (pending == 0) + m_promise.Resolve(m_dest); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + + var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); + return mapper.Promise; + } + + public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (action == null) + throw new ArgumentNullException("action"); + + var iter = new ArrayIterator<TSrc>(source, action, threads); + return iter.Promise; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/DispatchPool.cs Thu Nov 07 03:41:32 2013 +0400 @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public abstract class DispatchPool<TUnit> : IDisposable { + readonly int m_minThreads; + readonly int m_maxThreads; + int m_runningThreads = 0; + int m_maxRunningThreads = 0; + int m_suspended = 0; + int m_exitRequired = 0; + AutoResetEvent m_hasTasks = new AutoResetEvent(false); + + protected DispatchPool(int min, int max) { + if (min < 0) + throw new ArgumentOutOfRangeException("min"); + if (max <= 0) + throw new ArgumentOutOfRangeException("max"); + + if (min > max) + min = max; + m_minThreads = min; + m_maxThreads = max; + } + + protected DispatchPool(int threads) + : this(threads, threads) { + } + + protected DispatchPool() { + int maxThreads, maxCP; + ThreadPool.GetMaxThreads(out maxThreads, out maxCP); + + m_minThreads = 0; + m_maxThreads = maxThreads; + } + + protected void InitPool() { + for (int i = 0; i < m_minThreads; i++) + StartWorker(); + } + + public int ThreadCount { + get { + return m_runningThreads; + } + } + + public int MaxRunningThreads { + get { + return m_maxRunningThreads; + } + } + + protected bool IsDisposed { + get { + return m_exitRequired != 0; + } + } + + bool StartWorker() { + var current = m_runningThreads; + // use spins to allocate slot for the new thread + do { + if (current >= m_maxThreads || m_exitRequired != 0) + // no more slots left or the pool has been disposed + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + + m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); + + // slot successfully allocated + + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } + + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual void WakeNewWorker() { + if (m_suspended > 0) + m_hasTasks.Set(); + else + StartWorker(); + } + + bool FetchTask(out TUnit unit) { + do { + // exit if requested + if (m_exitRequired != 0) { + // release the thread slot + int running; + do { + running = m_runningThreads; + } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); + running--; + + if (running == 0) // it was the last worker + m_hasTasks.Dispose(); + else + m_hasTasks.Set(); // release next worker + unit = default(TUnit); + return false; + } + + // fetch task + if (TryDequeue(out unit)) { + WakeNewWorker(); + return true; + } + + //no tasks left, exit if the thread is no longer needed + int runningThreads; + bool exit = true; + do { + runningThreads = m_runningThreads; + if (runningThreads <= m_minThreads) { + exit = false; + break; + } + } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); + + if (exit) { + Interlocked.Decrement(ref m_runningThreads); + return false; + } + + // keep this thread and wait + Interlocked.Increment(ref m_suspended); + m_hasTasks.WaitOne(); + Interlocked.Decrement(ref m_suspended); + } while (true); + } + + protected abstract void InvokeUnit(TUnit unit); + + void Worker() { + TUnit unit; + while (FetchTask(out unit)) + InvokeUnit(unit); + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + if (m_exitRequired == 0) { + if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) + return; + + // wake sleeping threads + m_hasTasks.Set(); + GC.SuppressFinalize(this); + } + } + } + + public void Dispose() { + Dispose(true); + } + + ~DispatchPool() { + Dispose(false); + } + } +}
--- a/Implab/Parallels/WorkerPool.cs Wed Nov 06 17:49:12 2013 +0400 +++ b/Implab/Parallels/WorkerPool.cs Thu Nov 07 03:41:32 2013 +0400 @@ -6,66 +6,35 @@ using System.Diagnostics; namespace Implab.Parallels { - public class WorkerPool : IDisposable { - readonly int m_minThreads; - readonly int m_maxThreads; - int m_runningThreads; - object m_lock = new object(); - - bool m_disposed = false; - - // this event will signal that workers can try to fetch a task from queue or the pool has been disposed - ManualResetEvent m_hasTasks = new ManualResetEvent(false); - Queue<Action> m_queue = new Queue<Action>(); + public class WorkerPool : DispatchPool<Action> { - public WorkerPool(int min, int max) { - if (min < 0) - throw new ArgumentOutOfRangeException("min"); - if (max <= 0) - throw new ArgumentOutOfRangeException("max"); + MTQueue<Action> m_queue = new MTQueue<Action>(); + int m_queueLength = 0; - if (min > max) - min = max; - m_minThreads = min; - m_maxThreads = max; - - InitPool(); + public WorkerPool(int minThreads, int maxThreads) + : base(minThreads, maxThreads) { + InitPool(); } - public WorkerPool(int max) - : this(0, max) { + public WorkerPool(int threads) + : base(threads) { + InitPool(); } - public WorkerPool() { - int maxThreads, maxCP; - ThreadPool.GetMaxThreads(out maxThreads, out maxCP); - - m_minThreads = 0; - m_maxThreads = maxThreads; - - InitPool(); - } - - void InitPool() { - for (int i = 0; i < m_minThreads; i++) - StartWorker(); - } - - public int ThreadCount { - get { - return m_runningThreads; - } + public WorkerPool() + : base() { + InitPool(); } public Promise<T> Invoke<T>(Func<T> task) { - if (m_disposed) - throw new ObjectDisposedException(ToString()); if (task == null) throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); var promise = new Promise<T>(); - var queueLen = EnqueueTask(delegate() { + EnqueueTask(delegate() { try { promise.Resolve(task()); } catch (Exception e) { @@ -73,99 +42,28 @@ } }); - if (queueLen > 1) - StartWorker(); - return promise; } - bool StartWorker() { - var current = m_runningThreads; - // use spins to allocate slot for the new thread - do { - if (current >= m_maxThreads) - // no more slots left - return false; - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); - - // slot successfully allocated - - var worker = new Thread(this.Worker); - worker.IsBackground = true; - worker.Start(); - - return true; - } - - int EnqueueTask(Action task) { - Debug.Assert(task != null); - lock (m_queue) { - m_queue.Enqueue(task); - m_hasTasks.Set(); - return m_queue.Count; - } + protected void EnqueueTask(Action unit) { + Debug.Assert(unit != null); + Interlocked.Increment(ref m_queueLength); + m_queue.Enqueue(unit); + // if there are sleeping threads in the pool wake one + // probably this will lead a dry run + WakeNewWorker(); } - bool FetchTask(out Action task) { - task = null; - - while (true) { - - m_hasTasks.WaitOne(); - - if (m_disposed) - return false; - - lock (m_queue) { - if (m_queue.Count > 0) { - task = m_queue.Dequeue(); - return true; - } - - // no tasks left - // signal that no more tasks left, current lock ensures that this event won't suppress newly added task - m_hasTasks.Reset(); - } - - bool exit = true; - - var current = m_runningThreads; - do { - if (current <= m_minThreads) { - exit = false; // this thread should return and wait for the new events - break; - } - } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); - - if (exit) - return false; + protected override bool TryDequeue(out Action unit) { + if (m_queue.TryDequeue(out unit)) { + Interlocked.Decrement(ref m_queueLength); + return true; } + return false; } - void Worker() { - Action task; - while (FetchTask(out task)) - task(); - } - - protected virtual void Dispose(bool disposing) { - if (disposing) { - lock (m_lock) { - if (m_disposed) - return; - m_disposed = true; - } - m_hasTasks.Set(); - GC.SuppressFinalize(this); - } - } - - public void Dispose() { - Dispose(true); - } - - ~WorkerPool() { - Dispose(false); + protected override void InvokeUnit(Action unit) { + unit(); } } }