Mercurial > pub > ImplabNet
diff Implab.Test/AsyncTests.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | e943453e5039 |
children | 5a4b735ba669 |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Wed Nov 06 17:49:12 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Thu Nov 07 03:41:32 2013 +0400 @@ -4,71 +4,64 @@ using System.Threading; using Implab.Parallels; -namespace Implab.Test -{ - [TestClass] - public class AsyncTests - { - [TestMethod] - public void ResolveTest () - { - int res = -1; - var p = new Promise<int> (); - p.Then (x => res = x); - p.Resolve (100); +namespace Implab.Test { + [TestClass] + public class AsyncTests { + [TestMethod] + public void ResolveTest() { + int res = -1; + var p = new Promise<int>(); + p.Then(x => res = x); + p.Resolve(100); - Assert.AreEqual (res, 100); - } + Assert.AreEqual(res, 100); + } [TestMethod] - public void RejectTest () - { - int res = -1; - Exception err = null; + public void RejectTest() { + int res = -1; + Exception err = null; - var p = new Promise<int> (); - p.Then (x => res = x, e => err = e); - p.Reject (new ApplicationException ("error")); + var p = new Promise<int>(); + p.Then(x => res = x, e => err = e); + p.Reject(new ApplicationException("error")); - Assert.AreEqual (res, -1); - Assert.AreEqual (err.Message, "error"); + Assert.AreEqual(res, -1); + Assert.AreEqual(err.Message, "error"); - } + } [TestMethod] - public void JoinSuccessTest () - { - var p = new Promise<int> (); - p.Resolve (100); - Assert.AreEqual (p.Join (), 100); - } + public void JoinSuccessTest() { + var p = new Promise<int>(); + p.Resolve(100); + Assert.AreEqual(p.Join(), 100); + } [TestMethod] - public void JoinFailTest () - { - var p = new Promise<int> (); - p.Reject (new ApplicationException ("failed")); + public void JoinFailTest() { + var p = new Promise<int>(); + p.Reject(new ApplicationException("failed")); - try { - p.Join (); - throw new ApplicationException ("WRONG!"); - } catch (TargetInvocationException err) { - Assert.AreEqual (err.InnerException.Message, "failed"); - } catch { - Assert.Fail ("Got wrong excaption"); - } - } + try { + p.Join(); + throw new ApplicationException("WRONG!"); + } catch (TargetInvocationException err) { + Assert.AreEqual(err.InnerException.Message, "failed"); + } catch { + Assert.Fail("Got wrong excaption"); + } + } [TestMethod] - public void MapTest () - { - var p = new Promise<int> (); + public void MapTest() { + var p = new Promise<int>(); - var p2 = p.Map (x => x.ToString ()); - p.Resolve (100); + var p2 = p.Map(x => x.ToString()); + p.Resolve(100); - Assert.AreEqual (p2.Join (), "100"); - } + Assert.AreEqual(p2.Join(), "100"); + } [TestMethod] public void FixErrorTest() { @@ -82,65 +75,90 @@ } [TestMethod] - public void ChainTest () - { - var p1 = new Promise<int> (); + public void ChainTest() { + var p1 = new Promise<int>(); - var p3 = p1.Chain (x => { - var p2 = new Promise<string> (); - p2.Resolve (x.ToString ()); - return p2; - }); + var p3 = p1.Chain(x => { + var p2 = new Promise<string>(); + p2.Resolve(x.ToString()); + return p2; + }); - p1.Resolve (100); + p1.Resolve(100); - Assert.AreEqual (p3.Join (), "100"); - } + Assert.AreEqual(p3.Join(), "100"); + } [TestMethod] - public void PoolTest () - { - var pid = Thread.CurrentThread.ManagedThreadId; - var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId); + public void PoolTest() { + var pid = Thread.CurrentThread.ManagedThreadId; + var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); - Assert.AreNotEqual (pid, p.Join ()); - } + Assert.AreNotEqual(pid, p.Join()); + } [TestMethod] public void WorkerPoolSizeTest() { - var pool = new WorkerPool(5,10); + var pool = new WorkerPool(5, 10); Assert.AreEqual(5, pool.ThreadCount); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); Assert.AreEqual(5, pool.ThreadCount); for (int i = 0; i < 100; i++) - pool.Invoke(() => { Thread.Sleep(1000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + Thread.Sleep(100); Assert.AreEqual(10, pool.ThreadCount); + + pool.Dispose(); } [TestMethod] public void WorkerPoolCorrectTest() { - var pool = new WorkerPool(5, 20); + var pool = new WorkerPool(); + + int iterations = 1000; + int pending = iterations; + var stop = new ManualResetEvent(false); var count = 0; - for (int i = 0; i < 1000; i++) + for (int i = 0; i < iterations; i++) { pool .Invoke(() => 1) - .Then(x => Interlocked.Add(ref count, x)); + .Then(x => Interlocked.Add(ref count, x)) + .Then(x => Math.Log10(x)) + .Anyway(() => { + Interlocked.Decrement(ref pending); + if (pending == 0) + stop.Set(); + }); + } + + stop.WaitOne(); - Assert.AreEqual(1000, count); + Assert.AreEqual(iterations, count); + Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); + pool.Dispose(); + + } + + [TestMethod] + public void WorkerPoolDisposeTest() { + var pool = new WorkerPool(5, 20); + Assert.AreEqual(5, pool.ThreadCount); + pool.Dispose(); + Thread.Sleep(100); + Assert.AreEqual(0, pool.ThreadCount); + pool.Dispose(); } [TestMethod] public void MTQueueTest() { var queue = new MTQueue<int>(); - var pool = new WorkerPool(5, 20); - int res; queue.Enqueue(10); @@ -169,33 +187,27 @@ var wn = i; AsyncPool .InvokeNewThread(() => { - Console.WriteLine("Started writer: {0}", wn); for (int ii = 0; ii < itemsPerWriter; ii++) { queue.Enqueue(1); - Thread.Sleep(1); } - Console.WriteLine("Stopped writer: {0}", wn); return 1; }) - .Then(x => Interlocked.Decrement(ref writers) ); + .Anyway(() => Interlocked.Decrement(ref writers)); } - + for (int i = 0; i < 10; i++) { Interlocked.Increment(ref readers); var wn = i; AsyncPool .InvokeNewThread(() => { int t; - Console.WriteLine("Started reader: {0}", wn); do { while (queue.TryDequeue(out t)) Interlocked.Add(ref total, t); - Thread.Sleep(0); } while (writers > 0); - Console.WriteLine("Stopped reader: {0}", wn); return 1; }) - .Then(x => { + .Anyway(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); @@ -208,6 +220,55 @@ } [TestMethod] + public void ParallelMapTest() { + + int count = 100000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); + + var t = Environment.TickCount; + var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); + + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } + + [TestMethod] + public void ParallelForEachTest() { + + int count = 100000; + + int[] args = new int[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = (int)(rand.NextDouble() * 100); + + int result = 0; + + var t = Environment.TickCount; + args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); + + Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); + + int result2 = 0; + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + result2 += args[i]; + Assert.AreEqual(result2, result); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; @@ -219,7 +280,7 @@ .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) - .Map( y => y ) + .Map(y => y) .Cancelled(() => flags[1] = true) ) .Cancelled(() => flags[2] = true); @@ -228,13 +289,13 @@ try { Assert.AreEqual(p.Join(), "Hi, Alan"); Assert.Fail("Shouldn't get here"); - } catch(OperationCanceledException) { + } catch (OperationCanceledException) { } Assert.IsFalse(flags[0]); Assert.IsTrue(flags[1]); Assert.IsTrue(flags[2]); } - } + } }