view Implab.Test/AsyncTests.cs @ 17:7cd4a843b4e4 promises

Improved worker pool
author cin
date Fri, 08 Nov 2013 01:25:42 +0400
parents 5a4b735ba669
children e3935fdf59a2
line wrap: on
line source

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Reflection;
using System.Threading;
using Implab.Parallels;

namespace Implab.Test {
    [TestClass]
    public class AsyncTests {
        [TestMethod]
        public void ResolveTest() {
            int res = -1;
            var p = new Promise<int>();
            p.Then(x => res = x);
            p.Resolve(100);

            Assert.AreEqual(res, 100);
        }

        [TestMethod]
        public void RejectTest() {
            int res = -1;
            Exception err = null;

            var p = new Promise<int>();
            p.Then(x => res = x, e => err = e);
            p.Reject(new ApplicationException("error"));

            Assert.AreEqual(res, -1);
            Assert.AreEqual(err.Message, "error");

        }

        [TestMethod]
        public void JoinSuccessTest() {
            var p = new Promise<int>();
            p.Resolve(100);
            Assert.AreEqual(p.Join(), 100);
        }

        [TestMethod]
        public void JoinFailTest() {
            var p = new Promise<int>();
            p.Reject(new ApplicationException("failed"));

            try {
                p.Join();
                throw new ApplicationException("WRONG!");
            } catch (TargetInvocationException err) {
                Assert.AreEqual(err.InnerException.Message, "failed");
            } catch {
                Assert.Fail("Got wrong excaption");
            }
        }

        [TestMethod]
        public void MapTest() {
            var p = new Promise<int>();

            var p2 = p.Map(x => x.ToString());
            p.Resolve(100);

            Assert.AreEqual(p2.Join(), "100");
        }

        [TestMethod]
        public void FixErrorTest() {
            var p = new Promise<int>();

            var p2 = p.Error(e => 101);

            p.Reject(new Exception());

            Assert.AreEqual(p2.Join(), 101);
        }

        [TestMethod]
        public void ChainTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Resolve(x.ToString());
                return p2;
            });

            p1.Resolve(100);

            Assert.AreEqual(p3.Join(), "100");
        }

        [TestMethod]
        public void PoolTest() {
            var pid = Thread.CurrentThread.ManagedThreadId;
            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);

            Assert.AreNotEqual(pid, p.Join());
        }

        [TestMethod]
        public void WorkerPoolSizeTest() {
            var pool = new WorkerPool(5, 10, 0);

            Assert.AreEqual(5, pool.ThreadCount);

            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });

            Assert.AreEqual(5, pool.ThreadCount);

            for (int i = 0; i < 100; i++)
                pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
            Thread.Sleep(100);
            Assert.AreEqual(10, pool.ThreadCount);

            pool.Dispose();
        }

        [TestMethod]
        public void WorkerPoolCorrectTest() {
            var pool = new WorkerPool(0,1000,100);

            int iterations = 1000;
            int pending = iterations;
            var stop = new ManualResetEvent(false);

            var count = 0;
            for (int i = 0; i < iterations; i++) {
                pool
                    .Invoke(() => 1)
                    .Then(x => Interlocked.Add(ref count, x))
                    .Then(x => Math.Log10(x))
                    .Anyway(() => {
                        Interlocked.Decrement(ref pending);
                        if (pending == 0)
                            stop.Set();
                    });
            }

            stop.WaitOne();

            Assert.AreEqual(iterations, count);
            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
            pool.Dispose();
            
        }

        [TestMethod]
        public void WorkerPoolDisposeTest() {
            var pool = new WorkerPool(5, 20);
            Assert.AreEqual(5, pool.ThreadCount);
            pool.Dispose();
            Thread.Sleep(100);
            Assert.AreEqual(0, pool.ThreadCount);
            pool.Dispose();
        }

        [TestMethod]
        public void MTQueueTest() {
            var queue = new MTQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            int writers = 0;
            int readers = 0;
            var stop = new ManualResetEvent(false);
            int total = 0;

            int itemsPerWriter = 1000;
            int writersCount = 3;

            for (int i = 0; i < writersCount; i++) {
                Interlocked.Increment(ref writers);
                var wn = i;
                AsyncPool
                    .InvokeNewThread(() => {
                        for (int ii = 0; ii < itemsPerWriter; ii++) {
                            queue.Enqueue(1);
                        }
                        return 1;
                    })
                    .Anyway(() => Interlocked.Decrement(ref writers));
            }

            for (int i = 0; i < 10; i++) {
                Interlocked.Increment(ref readers);
                var wn = i;
                AsyncPool
                    .InvokeNewThread(() => {
                        int t;
                        do {
                            while (queue.TryDequeue(out t))
                                Interlocked.Add(ref total, t);
                        } while (writers > 0);
                        return 1;
                    })
                    .Anyway(() => {
                        Interlocked.Decrement(ref readers);
                        if (readers == 0)
                            stop.Set();
                    });
            }

            stop.WaitOne();

            Assert.AreEqual(itemsPerWriter * writersCount, total);
        }

        [TestMethod]
        public void ParallelMapTest() {

            int count = 100000;

            double[] args = new double[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = rand.NextDouble();

            var t = Environment.TickCount;
            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();

            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ChainedMapTest() {

            using (var pool = new WorkerPool(8,100,0)) {
                int count = 10000;

                double[] args = new double[count];
                var rand = new Random();

                for (int i = 0; i < count; i++)
                    args[i] = rand.NextDouble();

                var t = Environment.TickCount;
                var res = args
                    .ChainedMap(
                        x => pool.Invoke(
                            () => Math.Sin(x * x)
                        ),
                        4
                    )
                    .Join();

                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

                t = Environment.TickCount;
                for (int i = 0; i < count; i++)
                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
            }
        }

        [TestMethod]
        public void ParallelForEachTest() {

            int count = 100000;

            int[] args = new int[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = (int)(rand.NextDouble() * 100);

            int result = 0;

            var t = Environment.TickCount;
            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();

            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);

            int result2 = 0;

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                result2 += args[i];
            Assert.AreEqual(result2, result);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ComplexCase1Test() {
            var flags = new bool[3];

            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)

            var p = PromiseHelper
                .Sleep(200, "Alan")
                .Cancelled(() => flags[0] = true)
                .Chain(x =>
                    PromiseHelper
                        .Sleep(200, "Hi, " + x)
                        .Map(y => y)
                        .Cancelled(() => flags[1] = true)
                )
                .Cancelled(() => flags[2] = true);
            Thread.Sleep(300);
            p.Cancel();
            try {
                Assert.AreEqual(p.Join(), "Hi, Alan");
                Assert.Fail("Shouldn't get here");
            } catch (OperationCanceledException) {
            }

            Assert.IsFalse(flags[0]);
            Assert.IsTrue(flags[1]);
            Assert.IsTrue(flags[2]);
        }
    }
}