diff Implab.Test/AsyncTests.cs @ 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400
parents e943453e5039
children 5a4b735ba669
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Wed Nov 06 17:49:12 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Thu Nov 07 03:41:32 2013 +0400
@@ -4,71 +4,64 @@
 using System.Threading;
 using Implab.Parallels;
 
-namespace Implab.Test
-{
-	[TestClass]
-	public class AsyncTests
-	{
-		[TestMethod]
-		public void ResolveTest ()
-		{
-			int res = -1;
-			var p = new Promise<int> ();
-			p.Then (x => res = x);
-			p.Resolve (100);
+namespace Implab.Test {
+    [TestClass]
+    public class AsyncTests {
+        [TestMethod]
+        public void ResolveTest() {
+            int res = -1;
+            var p = new Promise<int>();
+            p.Then(x => res = x);
+            p.Resolve(100);
 
-			Assert.AreEqual (res, 100);
-		}
+            Assert.AreEqual(res, 100);
+        }
 
         [TestMethod]
-		public void RejectTest ()
-		{
-			int res = -1;
-			Exception err = null;
+        public void RejectTest() {
+            int res = -1;
+            Exception err = null;
 
-			var p = new Promise<int> ();
-			p.Then (x => res = x, e => err = e);
-			p.Reject (new ApplicationException ("error"));
+            var p = new Promise<int>();
+            p.Then(x => res = x, e => err = e);
+            p.Reject(new ApplicationException("error"));
 
-			Assert.AreEqual (res, -1);
-			Assert.AreEqual (err.Message, "error");
+            Assert.AreEqual(res, -1);
+            Assert.AreEqual(err.Message, "error");
 
-		}
+        }
 
         [TestMethod]
-		public void JoinSuccessTest ()
-		{
-			var p = new Promise<int> ();
-			p.Resolve (100);
-			Assert.AreEqual (p.Join (), 100);
-		}
+        public void JoinSuccessTest() {
+            var p = new Promise<int>();
+            p.Resolve(100);
+            Assert.AreEqual(p.Join(), 100);
+        }
 
         [TestMethod]
-		public void JoinFailTest ()
-		{
-			var p = new Promise<int> ();
-			p.Reject (new ApplicationException ("failed"));
+        public void JoinFailTest() {
+            var p = new Promise<int>();
+            p.Reject(new ApplicationException("failed"));
 
-			try {
-				p.Join ();
-				throw new ApplicationException ("WRONG!");
-			} catch (TargetInvocationException err) {
-				Assert.AreEqual (err.InnerException.Message, "failed");
-			} catch {
-				Assert.Fail ("Got wrong excaption");
-			}
-		}
+            try {
+                p.Join();
+                throw new ApplicationException("WRONG!");
+            } catch (TargetInvocationException err) {
+                Assert.AreEqual(err.InnerException.Message, "failed");
+            } catch {
+                Assert.Fail("Got wrong excaption");
+            }
+        }
 
         [TestMethod]
-		public void MapTest ()
-		{
-			var p = new Promise<int> ();
+        public void MapTest() {
+            var p = new Promise<int>();
 
-			var p2 = p.Map (x => x.ToString ());
-			p.Resolve (100);
+            var p2 = p.Map(x => x.ToString());
+            p.Resolve(100);
 
-			Assert.AreEqual (p2.Join (), "100");
-		}
+            Assert.AreEqual(p2.Join(), "100");
+        }
 
         [TestMethod]
         public void FixErrorTest() {
@@ -82,65 +75,90 @@
         }
 
         [TestMethod]
-		public void ChainTest ()
-		{
-			var p1 = new Promise<int> ();
+        public void ChainTest() {
+            var p1 = new Promise<int>();
 
-			var p3 = p1.Chain (x => {
-				var p2 = new Promise<string> ();
-				p2.Resolve (x.ToString ());
-				return p2;
-			});
+            var p3 = p1.Chain(x => {
+                var p2 = new Promise<string>();
+                p2.Resolve(x.ToString());
+                return p2;
+            });
 
-			p1.Resolve (100);
+            p1.Resolve(100);
 
-			Assert.AreEqual (p3.Join (), "100");
-		}
+            Assert.AreEqual(p3.Join(), "100");
+        }
 
         [TestMethod]
-		public void PoolTest ()
-		{
-			var pid = Thread.CurrentThread.ManagedThreadId;
-			var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
+        public void PoolTest() {
+            var pid = Thread.CurrentThread.ManagedThreadId;
+            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
 
-			Assert.AreNotEqual (pid, p.Join ());
-		}
+            Assert.AreNotEqual(pid, p.Join());
+        }
 
         [TestMethod]
         public void WorkerPoolSizeTest() {
-            var pool = new WorkerPool(5,10);
+            var pool = new WorkerPool(5, 10);
 
             Assert.AreEqual(5, pool.ThreadCount);
 
-            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
 
             Assert.AreEqual(5, pool.ThreadCount);
 
             for (int i = 0; i < 100; i++)
-                pool.Invoke(() => { Thread.Sleep(1000); return 10; });
+                pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            Thread.Sleep(100);
             Assert.AreEqual(10, pool.ThreadCount);
+
+            pool.Dispose();
         }
 
         [TestMethod]
         public void WorkerPoolCorrectTest() {
-            var pool = new WorkerPool(5, 20);
+            var pool = new WorkerPool();
+
+            int iterations = 1000;
+            int pending = iterations;
+            var stop = new ManualResetEvent(false);
 
             var count = 0;
-            for (int i = 0; i < 1000; i++)
+            for (int i = 0; i < iterations; i++) {
                 pool
                     .Invoke(() => 1)
-                    .Then(x => Interlocked.Add(ref count, x));
+                    .Then(x => Interlocked.Add(ref count, x))
+                    .Then(x => Math.Log10(x))
+                    .Anyway(() => {
+                        Interlocked.Decrement(ref pending);
+                        if (pending == 0)
+                            stop.Set();
+                    });
+            }
+
+            stop.WaitOne();
 
-            Assert.AreEqual(1000, count);
+            Assert.AreEqual(iterations, count);
+            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
+            pool.Dispose();
+            
+        }
+
+        [TestMethod]
+        public void WorkerPoolDisposeTest() {
+            var pool = new WorkerPool(5, 20);
+            Assert.AreEqual(5, pool.ThreadCount);
+            pool.Dispose();
+            Thread.Sleep(100);
+            Assert.AreEqual(0, pool.ThreadCount);
+            pool.Dispose();
         }
 
         [TestMethod]
         public void MTQueueTest() {
             var queue = new MTQueue<int>();
-            var pool = new WorkerPool(5, 20);
-
             int res;
 
             queue.Enqueue(10);
@@ -169,33 +187,27 @@
                 var wn = i;
                 AsyncPool
                     .InvokeNewThread(() => {
-                        Console.WriteLine("Started writer: {0}", wn);
                         for (int ii = 0; ii < itemsPerWriter; ii++) {
                             queue.Enqueue(1);
-                            Thread.Sleep(1);
                         }
-                        Console.WriteLine("Stopped writer: {0}", wn);
                         return 1;
                     })
-                    .Then(x => Interlocked.Decrement(ref writers) );
+                    .Anyway(() => Interlocked.Decrement(ref writers));
             }
-            
+
             for (int i = 0; i < 10; i++) {
                 Interlocked.Increment(ref readers);
                 var wn = i;
                 AsyncPool
                     .InvokeNewThread(() => {
                         int t;
-                        Console.WriteLine("Started reader: {0}", wn);
                         do {
                             while (queue.TryDequeue(out t))
                                 Interlocked.Add(ref total, t);
-                            Thread.Sleep(0);
                         } while (writers > 0);
-                        Console.WriteLine("Stopped reader: {0}", wn);
                         return 1;
                     })
-                    .Then(x => {
+                    .Anyway(() => {
                         Interlocked.Decrement(ref readers);
                         if (readers == 0)
                             stop.Set();
@@ -208,6 +220,55 @@
         }
 
         [TestMethod]
+        public void ParallelMapTest() {
+
+            int count = 100000;
+
+            double[] args = new double[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = rand.NextDouble();
+
+            var t = Environment.TickCount;
+            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
+
+            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
+
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
+
+        [TestMethod]
+        public void ParallelForEachTest() {
+
+            int count = 100000;
+
+            int[] args = new int[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = (int)(rand.NextDouble() * 100);
+
+            int result = 0;
+
+            var t = Environment.TickCount;
+            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
+
+            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
+
+            int result2 = 0;
+
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                result2 += args[i];
+            Assert.AreEqual(result2, result);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
+
+        [TestMethod]
         public void ComplexCase1Test() {
             var flags = new bool[3];
 
@@ -219,7 +280,7 @@
                 .Chain(x =>
                     PromiseHelper
                         .Sleep(200, "Hi, " + x)
-                        .Map( y => y )
+                        .Map(y => y)
                         .Cancelled(() => flags[1] = true)
                 )
                 .Cancelled(() => flags[2] = true);
@@ -228,13 +289,13 @@
             try {
                 Assert.AreEqual(p.Join(), "Hi, Alan");
                 Assert.Fail("Shouldn't get here");
-            } catch(OperationCanceledException) {
+            } catch (OperationCanceledException) {
             }
 
             Assert.IsFalse(flags[0]);
             Assert.IsTrue(flags[1]);
             Assert.IsTrue(flags[2]);
         }
-	}
+    }
 }