view Implab.Test/AsyncTests.cs @ 33:b255e4aeef17

removed the reference to the parent from the promise object this allows resolved promises to release parents and results they are holding. Added complete set of operations to IPromiseBase interface Subscribing to the cancellation event of the promise should not affect it's IsExclusive property More tests.
author cin
date Thu, 10 Apr 2014 02:39:29 +0400
parents 2fad2d1f4b03
children dabf79fde388
line wrap: on
line source

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Reflection;
using System.Threading;
using Implab.Parallels;

namespace Implab.Test {
    [TestClass]
    public class AsyncTests {
        [TestMethod]
        public void ResolveTest() {
            int res = -1;
            var p = new Promise<int>();
            p.Then(x => res = x);
            p.Resolve(100);

            Assert.AreEqual(100, res);
        }

        [TestMethod]
        public void RejectTest() {
            int res = -1;
            Exception err = null;

            var p = new Promise<int>();
            p.Then(x => res = x, e => err = e);
            p.Reject(new ApplicationException("error"));

            Assert.AreEqual(res, -1);
            Assert.AreEqual(err.Message, "error");

        }

        [TestMethod]
        public void JoinSuccessTest() {
            var p = new Promise<int>();
            p.Resolve(100);
            Assert.AreEqual(p.Join(), 100);
        }

        [TestMethod]
        public void JoinFailTest() {
            var p = new Promise<int>();
            p.Reject(new ApplicationException("failed"));

            try {
                p.Join();
                throw new ApplicationException("WRONG!");
            } catch (TargetInvocationException err) {
                Assert.AreEqual(err.InnerException.Message, "failed");
            } catch {
                Assert.Fail("Got wrong excaption");
            }
        }

        [TestMethod]
        public void MapTest() {
            var p = new Promise<int>();

            var p2 = p.Map(x => x.ToString());
            p.Resolve(100);

            Assert.AreEqual(p2.Join(), "100");
        }

        [TestMethod]
        public void FixErrorTest() {
            var p = new Promise<int>();

            var p2 = p.Error(e => 101);

            p.Reject(new Exception());

            Assert.AreEqual(p2.Join(), 101);
        }

        [TestMethod]
        public void ChainTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Resolve(x.ToString());
                return p2;
            });

            p1.Resolve(100);

            Assert.AreEqual(p3.Join(), "100");
        }

        [TestMethod]
        public void PoolTest() {
            var pid = Thread.CurrentThread.ManagedThreadId;
            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);

            Assert.AreNotEqual(pid, p.Join());
        }

        [TestMethod]
        public void WorkerPoolSizeTest() {
            var pool = new WorkerPool(5, 10, 0);

            Assert.AreEqual(5, pool.PoolSize);

            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });

            Assert.AreEqual(5, pool.PoolSize);

            for (int i = 0; i < 100; i++)
                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            Thread.Sleep(200);
            Assert.AreEqual(10, pool.PoolSize);

            pool.Dispose();
        }

        [TestMethod]
        public void WorkerPoolCorrectTest() {
            var pool = new WorkerPool(0,1000,100);

            int iterations = 1000;
            int pending = iterations;
            var stop = new ManualResetEvent(false);

            var count = 0;
            for (int i = 0; i < iterations; i++) {
                pool
                    .Invoke(() => 1)
                    .Then(x => Interlocked.Add(ref count, x))
                    .Then(x => Math.Log10(x))
                    .Anyway(() => {
                        Interlocked.Decrement(ref pending);
                        if (pending == 0)
                            stop.Set();
                    });
            }

            stop.WaitOne();

            Assert.AreEqual(iterations, count);
            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
            pool.Dispose();
            
        }

        [TestMethod]
        public void WorkerPoolDisposeTest() {
            var pool = new WorkerPool(5, 20);
            Assert.AreEqual(5, pool.PoolSize);
            pool.Dispose();
            Thread.Sleep(500);
            Assert.AreEqual(0, pool.PoolSize);
            pool.Dispose();
        }

        [TestMethod]
        public void MTQueueTest() {
            var queue = new MTQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            int writers = 0;
            int readers = 0;
            var stop = new ManualResetEvent(false);
            int total = 0;

            int itemsPerWriter = 1000;
            int writersCount = 3;

            for (int i = 0; i < writersCount; i++) {
                Interlocked.Increment(ref writers);
                var wn = i;
                AsyncPool
                    .InvokeNewThread(() => {
                        for (int ii = 0; ii < itemsPerWriter; ii++) {
                            queue.Enqueue(1);
                        }
                        return 1;
                    })
                    .Anyway(() => Interlocked.Decrement(ref writers));
            }

            for (int i = 0; i < 10; i++) {
                Interlocked.Increment(ref readers);
                var wn = i;
                AsyncPool
                    .InvokeNewThread(() => {
                        int t;
                        do {
                            while (queue.TryDequeue(out t))
                                Interlocked.Add(ref total, t);
                        } while (writers > 0);
                        return 1;
                    })
                    .Anyway(() => {
                        Interlocked.Decrement(ref readers);
                        if (readers == 0)
                            stop.Set();
                    });
            }

            stop.WaitOne();

            Assert.AreEqual(itemsPerWriter * writersCount, total);
        }

        [TestMethod]
        public void ParallelMapTest() {

            int count = 100000;

            double[] args = new double[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = rand.NextDouble();

            var t = Environment.TickCount;
            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();

            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ChainedMapTest() {

            using (var pool = new WorkerPool(0,100,0)) {
                int count = 10000;

                double[] args = new double[count];
                var rand = new Random();

                for (int i = 0; i < count; i++)
                    args[i] = rand.NextDouble();

                var t = Environment.TickCount;
                var res = args
                    .ChainedMap(
                        x => pool.Invoke(
                            () => Math.Sin(x * x)
                        ),
                        4
                    )
                    .Join();

                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

                t = Environment.TickCount;
                for (int i = 0; i < count; i++)
                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
            }
        }

        [TestMethod]
        public void ParallelForEachTest() {

            int count = 100000;

            int[] args = new int[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = (int)(rand.NextDouble() * 100);

            int result = 0;

            var t = Environment.TickCount;
            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();

            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);

            int result2 = 0;

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                result2 += args[i];
            Assert.AreEqual(result2, result);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ComplexCase1Test() {
            var flags = new bool[3];

            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)

            var p = PromiseHelper
                .Sleep(200, "Alan")
                .Cancelled(() => flags[0] = true)
                .Chain(x =>
                    PromiseHelper
                        .Sleep(200, "Hi, " + x)
                        .Map(y => y)
                        .Cancelled(() => flags[1] = true)
                )
                .Cancelled(() => flags[2] = true);
            Thread.Sleep(300);
            p.Cancel();
            try {
                Assert.AreEqual(p.Join(), "Hi, Alan");
                Assert.Fail("Shouldn't get here");
            } catch (OperationCanceledException) {
            }

            Assert.IsFalse(flags[0]);
            Assert.IsTrue(flags[1]);
            Assert.IsTrue(flags[2]);
        }

        [TestMethod]
        public void ChainedCancel1Test() {
            // при отмене сцепленной асинхронной операции все обещание должно
            // завершаться ошибкой OperationCanceledException
            var p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Chain(x => {
                    // запускаем две асинхронные операции
                    var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
                    // вторая операция отменяет первую до завершения
                    PromiseHelper
                        .Sleep(100, "HAL, STOP!")
                        .Then(() => result.Cancel());
                    return result;
                });
            try {
                p.Join();
            } catch (TargetInvocationException err) {
                Assert.IsTrue(err.InnerException is OperationCanceledException);
            }
        }

        [TestMethod]
        public void ChainedCancel2Test() {
            // при отмене цепочки обещаний, вложенные операции также должны отменяться
            IPromiseBase p = null;
            var pSurvive = new Promise<bool>();
            var hemStarted = new ManualResetEvent(false);
            p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Chain(x => {
                    hemStarted.Set();
                    // запускаем две асинхронные операции
                    var result = PromiseHelper
                        .Sleep(1000, "HEM ENABLED!!!")
                        .Then(s => pSurvive.Resolve(false));

                    result
                        .Cancelled(() => pSurvive.Resolve(true));
                    
                    return result;
                });

            hemStarted.WaitOne();
            p.Cancel();

            try {
                p.Join();
            } catch (OperationCanceledException) {
                Assert.IsTrue(pSurvive.Join());
            }
        }
    }
}