view Implab.Test/AsyncTests.cs @ 196:40d7fed4a09e

fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author cin
date Mon, 29 Aug 2016 23:15:51 +0300
parents ec91a6dfa5b3
children 8200ab154c8a
line wrap: on
line source

using System;
using System.Reflection;
using System.Threading;
using Implab.Parallels;

#if MONO

using NUnit.Framework;
using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
using TestMethodAttribute = NUnit.Framework.TestAttribute;

#else

using Microsoft.VisualStudio.TestTools.UnitTesting;

#endif

namespace Implab.Test {
    [TestClass]
    public class AsyncTests {
        [TestMethod]
        public void ResolveTest() {
            int res = -1;
            var p = new Promise<int>();
            p.Then(x => res = x);
            p.Resolve(100);

            Assert.AreEqual(100, res);
        }

        [TestMethod]
        public void RejectTest() {
            int res = -1;
            Exception err = null;

            var p = new Promise<int>();
            p.Then(
                x => res = x,
                e => {
                    err = e;
                    return -2;
                }
            );
            p.Reject(new ApplicationException("error"));

            Assert.AreEqual(res, -1);
            Assert.AreEqual(err.Message, "error");

        }

        [TestMethod]
        public void CancelExceptionTest() {
            var p = new Promise<bool>();
            p.CancelOperation(null);

            var p2 = p.Then(x => x, null, reason => {
                throw new ApplicationException("CANCELLED"); 
            });

            try {
                p2.Join();
                Assert.Fail();
            } catch (ApplicationException err) {
                Assert.AreEqual("CANCELLED", err.InnerException.Message);
            }

        }

        [TestMethod]
        public void ContinueOnCancelTest() {
            var p = new Promise<bool>();
            p.CancelOperation(null);

            var p2 = p
                .Then(x => x, null, reason => {
                    throw new ApplicationException("CANCELLED");
                })
                .Then(x => x, e => true);

            Assert.AreEqual(true, p2.Join());
        }

        [TestMethod]
        public void JoinSuccessTest() {
            var p = new Promise<int>();
            p.Resolve(100);
            Assert.AreEqual(p.Join(), 100);
        }

        [TestMethod]
        public void JoinFailTest() {
            var p = new Promise<int>();
            p.Reject(new ApplicationException("failed"));

            try {
                p.Join();
                throw new ApplicationException("WRONG!");
            } catch (TargetInvocationException err) {
                Assert.AreEqual(err.InnerException.Message, "failed");
            } catch {
                Assert.Fail("Got wrong excaption");
            }
        }

        [TestMethod]
        public void MapTest() {
            var p = new Promise<int>();

            var p2 = p.Then(x => x.ToString());
            p.Resolve(100);

            Assert.AreEqual(p2.Join(), "100");
        }

        [TestMethod]
        public void FixErrorTest() {
            var p = new Promise<int>();

            var p2 = p.Then(x => x, e => 101);

            p.Reject(new Exception());

            Assert.AreEqual(p2.Join(), 101);
        }

        [TestMethod]
        public void ChainTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Resolve(x.ToString());
                return p2;
            });

            p1.Resolve(100);

            Assert.AreEqual(p3.Join(), "100");
        }

        [TestMethod]
        public void ChainFailTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Reject(new Exception("DIE!!!"));
                return p2;
            });

            p1.Resolve(100);

            Assert.IsTrue(p3.IsResolved);
        }

        [TestMethod]
        public void PoolTest() {
            var pid = Thread.CurrentThread.ManagedThreadId;
            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);

            Assert.AreNotEqual(pid, p.Join());
        }

        [TestMethod]
        public void WorkerPoolSizeTest() {
            var pool = new WorkerPool(5, 10, 1);

            Assert.AreEqual(5, pool.PoolSize);

            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });

            Assert.AreEqual(5, pool.PoolSize);

            for (int i = 0; i < 100; i++)
                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            Thread.Sleep(200);
            Assert.AreEqual(10, pool.PoolSize);

            pool.Dispose();
        }

        [TestMethod]
        public void WorkerPoolCorrectTest() {
            var pool = new WorkerPool(0,1000,100);

            const int iterations = 1000;
            int pending = iterations;
            var stop = new ManualResetEvent(false);

            var count = 0;
            for (int i = 0; i < iterations; i++) {
                pool
                    .Invoke(() => 1)
                    .Then(x => Interlocked.Add(ref count, x))
                    .Then(x => Math.Log10(x))
                    .On(() => {
                        Interlocked.Decrement(ref pending);
                        if (pending == 0)
                            stop.Set();
                    }, PromiseEventType.All);
            }

            stop.WaitOne();

            Assert.AreEqual(iterations, count);
            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
            pool.Dispose();
            
        }

        [TestMethod]
        public void WorkerPoolDisposeTest() {
            var pool = new WorkerPool(5, 20);
            Assert.AreEqual(5, pool.PoolSize);
            pool.Dispose();
            Thread.Sleep(500);
            Assert.AreEqual(0, pool.PoolSize);
            pool.Dispose();
        }

        [TestMethod]
        public void MTQueueTest() {
            var queue = new MTQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            int writers = 0;
            int readers = 0;
            var stop = new ManualResetEvent(false);
            int total = 0;

            const int itemsPerWriter = 10000;
            const int writersCount = 10;

            for (int i = 0; i < writersCount; i++) {
                Interlocked.Increment(ref writers);
                AsyncPool
                    .RunThread(() => {
                        for (int ii = 0; ii < itemsPerWriter; ii++) {
                            queue.Enqueue(1);
                        }
                        return 1;
                    })
                    .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
            }

            for (int i = 0; i < 10; i++) {
                Interlocked.Increment(ref readers);
                AsyncPool
                    .RunThread(() => {
                        int t;
                        do {
                            while (queue.TryDequeue(out t))
                                Interlocked.Add(ref total, t);
                        } while (writers > 0);
                        return 1;
                    })
                    .On(() => {
                        Interlocked.Decrement(ref readers);
                        if (readers == 0)
                            stop.Set();
                    }, PromiseEventType.All);
            }

            stop.WaitOne();

            Assert.AreEqual(100000, total);
        }

        [TestMethod]
        public void AsyncQueueTest() {
            var queue = new AsyncQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            const int count = 10000000;

            int res1 = 0, res2 = 0;
            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    for (var i = 0; i < count; i++)
                        queue.Enqueue(1);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    for (var i = 0; i < count; i++)
                        queue.Enqueue(2);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    int temp;
                    int i = 0;
                    while (i < count)
                        if (queue.TryDequeue(out temp)) {
                            i++;
                            res1 += temp;
                        }
                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    int temp;
                    int i = 0;
                    while (i < count)
                        if (queue.TryDequeue(out temp)) {
                            i++;
                            res2 += temp;
                        }
                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(count * 3, res1 + res2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                res1,
                res2,
                res1 + res2,
                count
            );
        }

        [TestMethod]
        public void AsyncQueueBatchTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 29;
            const int wCount = 400000;
            const int total = wBatch * wCount * 2;
            const int summ = wBatch * wCount * 3;

            int r1 = 0, r2 = 0;
            const int rBatch = 111;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 2;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];

                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r1 += buffer[i];
                            Interlocked.Add(ref read, actual);
                        }
                    }

                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];

                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r2 += buffer[i];
                            Interlocked.Add(ref read, actual);
                        }
                    }

                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void AsyncQueueChunkDequeueTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 31;
            const int wCount = 200000;
            const int total = wBatch * wCount * 3;
            const int summ = wBatch * wCount * 6;

            int r1 = 0, r2 = 0;
            const int rBatch = 1024;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 2;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 3;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];
                    int count = 1;
                    double avgchunk = 0;
                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r2 += buffer[i];
                            Interlocked.Add(ref read, actual);
                            avgchunk = avgchunk*(count-1)/count + actual/(double)count;
                            count ++;
                        }
                    }

                    Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void AsyncQueueDrainTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 11;
            const int wCount = 200000;
            const int total = wBatch * wCount * 3;
            const int summ = wBatch * wCount * 3;

            int r1 = 0, r2 = 0;
            const int rBatch = 11;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    for(int i =0; i < wCount * wBatch; i++)
                        queue.Enqueue(1);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
                },
                /*() => {
                    int temp;
                    int count = 0;
                    while (read < total)
                        if (queue.TryDequeue(out temp)) {
                            count++;
                            r1 += temp;
                            Interlocked.Increment(ref read);
                        }
                    Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
                },*/
                /*() => {
                    var buffer = new int[rBatch];
                    var count = 0;
                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r1 += buffer[i];
                            Interlocked.Add(ref read, actual);
                            count += actual;
                        }
                    }

                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
                },*/
                () => {
                    var count = 0;
                    while(read < total) {
                        var buffer = queue.Drain();
                        for(int i=0; i< buffer.Length; i++)
                            r1 += buffer[i];
                            Interlocked.Add(ref read, buffer.Length);
                        count += buffer.Length;
                    }
                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
                },
                () => {
                    var count = 0;
                    while(read < total) {
                        var buffer = queue.Drain();
                        for(int i=0; i< buffer.Length; i++)
                            r2 += buffer[i];
                        Interlocked.Add(ref read, buffer.Length);
                        count += buffer.Length;
                    }
                    Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void ParallelMapTest() {

            const int count = 100000;

            var args = new double[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = rand.NextDouble();

            var t = Environment.TickCount;
            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();

            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ChainedMapTest() {

            using (var pool = new WorkerPool()) {
                const int count = 10000;

                var args = new double[count];
                var rand = new Random();

                for (int i = 0; i < count; i++)
                    args[i] = rand.NextDouble();

                var t = Environment.TickCount;
                var res = args
                    .ChainedMap(
                        // Analysis disable once AccessToDisposedClosure
                        x => pool.Invoke(
                            () => Math.Sin(x * x)
                        ),
                        4
                    )
                    .Join();

                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

                t = Environment.TickCount;
                for (int i = 0; i < count; i++)
                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
            }
        }

        [TestMethod]
        public void ParallelForEachTest() {

            const int count = 100000;

            var args = new int[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = (int)(rand.NextDouble() * 100);

            int result = 0;

            var t = Environment.TickCount;
            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();

            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);

            int result2 = 0;

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                result2 += args[i];
            Assert.AreEqual(result2, result);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ComplexCase1Test() {
            var flags = new bool[3];

            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)

            var step1 = PromiseHelper
                .Sleep(200, "Alan")
                .On(() => flags[0] = true, PromiseEventType.Cancelled);
            var p = step1
                .Chain(x =>
                    PromiseHelper
                        .Sleep(200, "Hi, " + x)
                        .Then(y => y)
                        .On(() => flags[1] = true, PromiseEventType.Cancelled)
                )
                .On(() => flags[2] = true, PromiseEventType.Cancelled);
            step1.Join();
            p.Cancel();
            try {
                Assert.AreEqual(p.Join(), "Hi, Alan");
                Assert.Fail("Shouldn't get here");
            } catch (OperationCanceledException) {
            }

            Assert.IsFalse(flags[0]);
            Assert.IsTrue(flags[1]);
            Assert.IsTrue(flags[2]);
        }

        [TestMethod]
        public void ChainedCancel1Test() {
            // при отмене сцепленной асинхронной операции все обещание должно
            // завершаться ошибкой OperationCanceledException
            var p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Then(x => {
                    // запускаем две асинхронные операции
                    var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
                    // вторая операция отменяет первую до завершения
                    PromiseHelper
                        .Sleep(100, "HAL, STOP!")
                        .Then(result.Cancel);
                    return result;
                });
            try {
                p.Join();
            } catch (TargetInvocationException err) {
                Assert.IsTrue(err.InnerException is OperationCanceledException);
            }
        }

        [TestMethod]
        public void ChainedCancel2Test() {
            // при отмене цепочки обещаний, вложенные операции также должны отменяться
            var pSurvive = new Promise<bool>();
            var hemStarted = new Signal();
            var p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Chain(() => {
                    hemStarted.Set();
                    // запускаем две асинхронные операции
                    var result = PromiseHelper
                        .Sleep(2000, "HEM ENABLED!!!")
                        .Then(() => pSurvive.Resolve(false));

                    result
                        .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);

                    return result;
                });

            hemStarted.Wait();
            p.Cancel();

            try {
                p.Join();
                Assert.Fail();
            } catch (OperationCanceledException) {
            }
            Assert.IsTrue(pSurvive.Join());
        }

        [TestMethod]
        public void SharedLockTest() {
            var l = new SharedLock();
            int shared = 0;
            int exclusive = 0;
            var s1 = new Signal();
            var log = new AsyncQueue<string>();

            try {
                AsyncPool.RunThread(
                    () => {
                        log.Enqueue("Reader #1 started");
                        try {
                            l.LockShared();
                            log.Enqueue("Reader #1 lock got");
                            if (Interlocked.Increment(ref shared) == 2)
                                s1.Set();
                            s1.Wait();
                            log.Enqueue("Reader #1 finished");
                            Interlocked.Decrement(ref shared);
                        } finally {
                            l.Release();
                            log.Enqueue("Reader #1 lock released");
                        }
                    },
                    () => {
                        log.Enqueue("Reader #2 started");

                        try {
                            l.LockShared();
                            log.Enqueue("Reader #2 lock got");

                            if (Interlocked.Increment(ref shared) == 2)
                                s1.Set();
                            s1.Wait();
                            log.Enqueue("Reader #2 upgrading to writer");
                            Interlocked.Decrement(ref shared);
                            l.Upgrade();
                            log.Enqueue("Reader #2 upgraded");

                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
                            Assert.AreEqual(0, shared);
                            log.Enqueue("Reader #2 finished");
                            Interlocked.Decrement(ref exclusive);
                        } finally {
                            l.Release();
                            log.Enqueue("Reader #2 lock released");
                        }
                    },
                    () => {
                        log.Enqueue("Writer #1 started");
                        try {
                            l.LockExclusive();
                            log.Enqueue("Writer #1 got the lock");
                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
                            Interlocked.Decrement(ref exclusive);
                            log.Enqueue("Writer #1 is finished");
                        } finally {
                            l.Release();
                            log.Enqueue("Writer #1 lock released");
                        }
                    }
                ).Bundle().Join(1000);
                log.Enqueue("Done");
            } catch(Exception error) {
                log.Enqueue(error.Message);
                throw;
            } finally {
                foreach (var m in log)
                    Console.WriteLine(m);
            }
        }

        #if NET_4_5

        [TestMethod]
        public async void TaskInteropTest() {
            var promise = new Promise<int>();
            promise.Resolve(10);
            var res = await promise;

            Assert.AreEqual(10, res);
        }

        #endif
    }
}