view Implab.Test/AsyncTests.cs @ 209:a867536c68fc v2

Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise
author cin
date Wed, 16 Nov 2016 03:06:08 +0300
parents 8200ab154c8a
children d6fe09f5592c
line wrap: on
line source

using System;
using System.Reflection;
using System.Threading;
using Implab.Parallels;

#if MONO

using NUnit.Framework;
using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
using TestMethodAttribute = NUnit.Framework.TestAttribute;

#else

using Microsoft.VisualStudio.TestTools.UnitTesting;

#endif

namespace Implab.Test {
    [TestClass]
    public class AsyncTests {
        [TestMethod]
        public void ResolveTest() {
            int res = -1;
            var p = new Promise<int>();
            p.Then(x => res = x);
            p.Resolve(100);

            Assert.AreEqual(100, res);
        }

        [TestMethod]
        public void RejectTest() {
            int res = -1;
            Exception err = null;

            var p = new Promise<int>();
            p.Then(
                x => res = x,
                e => {
                    err = e;
                    return -2;
                }
            );
            p.Reject(new ApplicationException("error"));

            Assert.AreEqual(res, -1);
            Assert.AreEqual(err.Message, "error");

        }

        [TestMethod]
        public void CancelExceptionTest() {
            var p = new Promise<bool>();
            p.CancelOperation(null);

            var p2 = p.Then(x => x, null, reason => {
                throw new ApplicationException("CANCELLED"); 
            });

            try {
                p2.Join();
                Assert.Fail();
            } catch (ApplicationException err) {
                Assert.AreEqual("CANCELLED", err.InnerException.Message);
            }

        }

        [TestMethod]
        public void ContinueOnCancelTest() {
            var p = new Promise<bool>();
            p.CancelOperation(null);

            var p2 = p
                .Then(x => x, null, reason => {
                    throw new ApplicationException("CANCELLED");
                })
                .Then(x => x, e => true);

            Assert.AreEqual(true, p2.Join());
        }

        [TestMethod]
        public void JoinSuccessTest() {
            var p = new Promise<int>();
            p.Resolve(100);
            Assert.AreEqual(p.Join(), 100);
        }

        [TestMethod]
        public void JoinFailTest() {
            var p = new Promise<int>();
            p.Reject(new ApplicationException("failed"));

            try {
                p.Join();
                throw new ApplicationException("WRONG!");
            } catch (TargetInvocationException err) {
                Assert.AreEqual(err.InnerException.Message, "failed");
            } catch {
                Assert.Fail("Got wrong excaption");
            }
        }

        [TestMethod]
        public void MapTest() {
            var p = new Promise<int>();

            var p2 = p.Then(x => x.ToString());
            p.Resolve(100);

            Assert.AreEqual(p2.Join(), "100");
        }

        [TestMethod]
        public void FixErrorTest() {
            var p = new Promise<int>();

            var p2 = p.Then(x => x, e => 101);

            p.Reject(new Exception());

            Assert.AreEqual(p2.Join(), 101);
        }

        [TestMethod]
        public void ChainTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Resolve(x.ToString());
                return p2;
            });

            p1.Resolve(100);

            Assert.AreEqual(p3.Join(), "100");
        }

        [TestMethod]
        public void ChainFailTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Reject(new Exception("DIE!!!"));
                return p2;
            });

            p1.Resolve(100);

            Assert.IsTrue(p3.IsResolved);
        }

        [TestMethod]
        public void PoolTest() {
            var pid = Thread.CurrentThread.ManagedThreadId;
            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);

            Assert.AreNotEqual(pid, p.Join());
        }

        [TestMethod]
        public void WorkerPoolSizeTest() {
            var pool = new WorkerPool(5, 10, 1);

            Assert.AreEqual(5, pool.PoolSize);

            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });

            Assert.AreEqual(5, pool.PoolSize);

            for (int i = 0; i < 100; i++)
                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            Thread.Sleep(200);
            Assert.AreEqual(10, pool.PoolSize);

            pool.Dispose();
        }

        [TestMethod]
        public void WorkerPoolCorrectTest() {
            var pool = new WorkerPool(0,1000,100);

            const int iterations = 1000;
            int pending = iterations;
            var stop = new ManualResetEvent(false);

            var count = 0;
            for (int i = 0; i < iterations; i++) {
                pool
                    .Invoke(() => 1)
                    .Then(x => Interlocked.Add(ref count, x))
                    .Then(x => Math.Log10(x))
                    .On(() => {
                        Interlocked.Decrement(ref pending);
                        if (pending == 0)
                            stop.Set();
                    }, PromiseEventType.All);
            }

            stop.WaitOne();

            Assert.AreEqual(iterations, count);
            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
            pool.Dispose();
            
        }

        [TestMethod]
        public void WorkerPoolDisposeTest() {
            var pool = new WorkerPool(5, 20);
            Assert.AreEqual(5, pool.PoolSize);
            pool.Dispose();
            Thread.Sleep(500);
            Assert.AreEqual(0, pool.PoolSize);
            pool.Dispose();
        }

        [TestMethod]
        public void MTQueueTest() {
            var queue = new MTQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            int writers = 0;
            int readers = 0;
            var stop = new ManualResetEvent(false);
            int total = 0;

            const int itemsPerWriter = 10000;
            const int writersCount = 10;

            for (int i = 0; i < writersCount; i++) {
                Interlocked.Increment(ref writers);
                AsyncPool
                    .RunThread(() => {
                        for (int ii = 0; ii < itemsPerWriter; ii++) {
                            queue.Enqueue(1);
                        }
                        return 1;
                    })
                    .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
            }

            for (int i = 0; i < 10; i++) {
                Interlocked.Increment(ref readers);
                AsyncPool
                    .RunThread(() => {
                        int t;
                        do {
                            while (queue.TryDequeue(out t))
                                Interlocked.Add(ref total, t);
                        } while (writers > 0);
                        return 1;
                    })
                    .On(() => {
                        Interlocked.Decrement(ref readers);
                        if (readers == 0)
                            stop.Set();
                    }, PromiseEventType.All);
            }

            stop.WaitOne();

            Assert.AreEqual(100000, total);
        }

        [TestMethod]
        public void AsyncQueueTest() {
            var queue = new AsyncQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            const int count = 10000000;

            int res1 = 0, res2 = 0;
            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    for (var i = 0; i < count; i++)
                        queue.Enqueue(1);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    for (var i = 0; i < count; i++)
                        queue.Enqueue(2);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    int temp;
                    int i = 0;
                    while (i < count)
                        if (queue.TryDequeue(out temp)) {
                            i++;
                            res1 += temp;
                        }
                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    int temp;
                    int i = 0;
                    while (i < count)
                        if (queue.TryDequeue(out temp)) {
                            i++;
                            res2 += temp;
                        }
                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                }
            )
                .PromiseAll()
                .Join();

            Assert.AreEqual(count * 3, res1 + res2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                res1,
                res2,
                res1 + res2,
                count
            );
        }

        [TestMethod]
        public void AsyncQueueBatchTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 29;
            const int wCount = 400000;
            const int total = wBatch * wCount * 2;
            const int summ = wBatch * wCount * 3;

            int r1 = 0, r2 = 0;
            const int rBatch = 111;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 2;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];

                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r1 += buffer[i];
                            Interlocked.Add(ref read, actual);
                        }
                    }

                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];

                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r2 += buffer[i];
                            Interlocked.Add(ref read, actual);
                        }
                    }

                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                }
            )
                .PromiseAll()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void AsyncQueueChunkDequeueTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 31;
            const int wCount = 200000;
            const int total = wBatch * wCount * 3;
            const int summ = wBatch * wCount * 6;

            int r1 = 0, r2 = 0;
            const int rBatch = 1024;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 2;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 3;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];
                    int count = 1;
                    double avgchunk = 0;
                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r2 += buffer[i];
                            Interlocked.Add(ref read, actual);
                            avgchunk = avgchunk*(count-1)/count + actual/(double)count;
                            count ++;
                        }
                    }

                    Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
                }
            )
                .PromiseAll()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void AsyncQueueDrainTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 11;
            const int wCount = 200000;
            const int total = wBatch * wCount * 3;
            const int summ = wBatch * wCount * 3;

            int r1 = 0, r2 = 0;
            const int rBatch = 11;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    for(int i =0; i < wCount * wBatch; i++)
                        queue.Enqueue(1);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
                },
                /*() => {
                    int temp;
                    int count = 0;
                    while (read < total)
                        if (queue.TryDequeue(out temp)) {
                            count++;
                            r1 += temp;
                            Interlocked.Increment(ref read);
                        }
                    Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
                },*/
                /*() => {
                    var buffer = new int[rBatch];
                    var count = 0;
                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r1 += buffer[i];
                            Interlocked.Add(ref read, actual);
                            count += actual;
                        }
                    }

                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
                },*/
                () => {
                    var count = 0;
                    while(read < total) {
                        var buffer = queue.Drain();
                        for(int i=0; i< buffer.Length; i++)
                            r1 += buffer[i];
                            Interlocked.Add(ref read, buffer.Length);
                        count += buffer.Length;
                    }
                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
                },
                () => {
                    var count = 0;
                    while(read < total) {
                        var buffer = queue.Drain();
                        for(int i=0; i< buffer.Length; i++)
                            r2 += buffer[i];
                        Interlocked.Add(ref read, buffer.Length);
                        count += buffer.Length;
                    }
                    Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
                }
            )
                .PromiseAll()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void ParallelMapTest() {

            const int count = 100000;

            var args = new double[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = rand.NextDouble();

            var t = Environment.TickCount;
            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();

            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ChainedMapTest() {

            using (var pool = new WorkerPool()) {
                const int count = 10000;

                var args = new double[count];
                var rand = new Random();

                for (int i = 0; i < count; i++)
                    args[i] = rand.NextDouble();

                var t = Environment.TickCount;
                var res = args
                    .ChainedMap(
                        // Analysis disable once AccessToDisposedClosure
                        x => pool.Invoke(
                            () => Math.Sin(x * x)
                        ),
                        4
                    )
                    .Join();

                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

                t = Environment.TickCount;
                for (int i = 0; i < count; i++)
                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
            }
        }

        [TestMethod]
        public void ParallelForEachTest() {

            const int count = 100000;

            var args = new int[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = (int)(rand.NextDouble() * 100);

            int result = 0;

            var t = Environment.TickCount;
            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();

            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);

            int result2 = 0;

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                result2 += args[i];
            Assert.AreEqual(result2, result);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ComplexCase1Test() {
            var flags = new bool[3];

            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)

            var step1 = PromiseHelper
                .Sleep(200, "Alan")
                .On(() => flags[0] = true, PromiseEventType.Cancelled);
            var p = step1
                .Chain(x =>
                    PromiseHelper
                        .Sleep(200, "Hi, " + x)
                        .Then(y => y)
                        .On(() => flags[1] = true, PromiseEventType.Cancelled)
                )
                .On(() => flags[2] = true, PromiseEventType.Cancelled);
            step1.Join();
            p.Cancel();
            try {
                Assert.AreEqual(p.Join(), "Hi, Alan");
                Assert.Fail("Shouldn't get here");
            } catch (OperationCanceledException) {
            }

            Assert.IsFalse(flags[0]);
            Assert.IsTrue(flags[1]);
            Assert.IsTrue(flags[2]);
        }

        [TestMethod]
        public void ChainedCancel1Test() {
            // при отмене сцепленной асинхронной операции все обещание должно
            // завершаться ошибкой OperationCanceledException
            var p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Then(x => {
                    // запускаем две асинхронные операции
                    var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
                    // вторая операция отменяет первую до завершения
                    PromiseHelper
                        .Sleep(100, "HAL, STOP!")
                        .Then(result.Cancel);
                    return result;
                });
            try {
                p.Join();
            } catch (TargetInvocationException err) {
                Assert.IsTrue(err.InnerException is OperationCanceledException);
            }
        }

        [TestMethod]
        public void ChainedCancel2Test() {
            // при отмене цепочки обещаний, вложенные операции также должны отменяться
            var pSurvive = new Promise<bool>();
            var hemStarted = new Signal();
            var p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Chain(() => {
                    hemStarted.Set();
                    // запускаем две асинхронные операции
                    var result = PromiseHelper
                        .Sleep(2000, "HEM ENABLED!!!")
                        .Then(() => pSurvive.Resolve(false));

                    result
                        .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);

                    return result;
                });

            hemStarted.Wait();
            p.Cancel();

            try {
                p.Join();
                Assert.Fail();
            } catch (OperationCanceledException) {
            }
            Assert.IsTrue(pSurvive.Join());
        }

        [TestMethod]
        public void SharedLockTest() {
            var l = new SharedLock();
            int shared = 0;
            int exclusive = 0;
            var s1 = new Signal();
            var log = new AsyncQueue<string>();

            try {
                AsyncPool.RunThread(
                    () => {
                        log.Enqueue("Reader #1 started");
                        try {
                            l.LockShared();
                            log.Enqueue("Reader #1 lock got");
                            if (Interlocked.Increment(ref shared) == 2)
                                s1.Set();
                            s1.Wait();
                            log.Enqueue("Reader #1 finished");
                            Interlocked.Decrement(ref shared);
                        } finally {
                            l.Release();
                            log.Enqueue("Reader #1 lock released");
                        }
                    },
                    () => {
                        log.Enqueue("Reader #2 started");

                        try {
                            l.LockShared();
                            log.Enqueue("Reader #2 lock got");

                            if (Interlocked.Increment(ref shared) == 2)
                                s1.Set();
                            s1.Wait();
                            log.Enqueue("Reader #2 upgrading to writer");
                            Interlocked.Decrement(ref shared);
                            l.Upgrade();
                            log.Enqueue("Reader #2 upgraded");

                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
                            Assert.AreEqual(0, shared);
                            log.Enqueue("Reader #2 finished");
                            Interlocked.Decrement(ref exclusive);
                        } finally {
                            l.Release();
                            log.Enqueue("Reader #2 lock released");
                        }
                    },
                    () => {
                        log.Enqueue("Writer #1 started");
                        try {
                            l.LockExclusive();
                            log.Enqueue("Writer #1 got the lock");
                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
                            Interlocked.Decrement(ref exclusive);
                            log.Enqueue("Writer #1 is finished");
                        } finally {
                            l.Release();
                            log.Enqueue("Writer #1 lock released");
                        }
                    }
                ).PromiseAll().Join(1000);
                log.Enqueue("Done");
            } catch(Exception error) {
                log.Enqueue(error.Message);
                throw;
            } finally {
                foreach (var m in log)
                    Console.WriteLine(m);
            }
        }

        #if NET_4_5

        [TestMethod]
        public async void TaskInteropTest() {
            var promise = new Promise<int>();
            promise.Resolve(10);
            var res = await promise;

            Assert.AreEqual(10, res);
        }

        #endif
    }
}