view Implab.Test/AsyncTests.cs @ 144:8c0b95069066 v2

DRAFT: refactoring
author cin
date Fri, 06 Mar 2015 15:45:26 +0300
parents f75cfa58e3d4
children 706fccb85524
line wrap: on
line source

using System;
using System.Reflection;
using System.Threading;
using Implab.Parallels;

#if MONO

using NUnit.Framework;
using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
using TestMethod = NUnit.Framework.TestAttribute;

#else

using Microsoft.VisualStudio.TestTools.UnitTesting;

#endif

namespace Implab.Test {
    [TestClass]
    public class AsyncTests {
        [TestMethod]
        public void ResolveTest() {
            int res = -1;
            var p = new Promise<int>();
            p.Then(x => res = x);
            p.Resolve(100);

            Assert.AreEqual(100, res);
        }

        [TestMethod]
        public void RejectTest() {
            int res = -1;
            Exception err = null;

            var p = new Promise<int>();
            p.Then(
                x => res = x,
                e => {
                    err = e;
                    return -2;
                }
            );
            p.Reject(new ApplicationException("error"));

            Assert.AreEqual(res, -1);
            Assert.AreEqual(err.Message, "error");

        }

        [TestMethod]
        public void CancelExceptionTest() {
            var p = new Promise<bool>();
            p.Cancel();

            var p2 = p.Then(x => x, null, reason => {
                throw new ApplicationException("CANCELLED"); 
            });

            try {
                p2.Join();
                Assert.Fail();
            } catch (ApplicationException err) {
                Assert.AreEqual("CANCELLED", err.InnerException.Message);
            }

        }

        [TestMethod]
        public void ContinueOnCancelTest() {
            var p = new Promise<bool>();
            p.Cancel();

            var p2 = p
                .Then<bool>(x => x, null, reason => {
                    throw new ApplicationException("CANCELLED");
                })
                .Then(x => x, e => true);

            Assert.AreEqual(true, p2.Join());
        }

        [TestMethod]
        public void JoinSuccessTest() {
            var p = new Promise<int>();
            p.Resolve(100);
            Assert.AreEqual(p.Join(), 100);
        }

        [TestMethod]
        public void JoinFailTest() {
            var p = new Promise<int>();
            p.Reject(new ApplicationException("failed"));

            try {
                p.Join();
                throw new ApplicationException("WRONG!");
            } catch (TargetInvocationException err) {
                Assert.AreEqual(err.InnerException.Message, "failed");
            } catch {
                Assert.Fail("Got wrong excaption");
            }
        }

        [TestMethod]
        public void MapTest() {
            var p = new Promise<int>();

            var p2 = p.Then(x => x.ToString());
            p.Resolve(100);

            Assert.AreEqual(p2.Join(), "100");
        }

        [TestMethod]
        public void FixErrorTest() {
            var p = new Promise<int>();

            var p2 = p.Then(x => x, e => 101);

            p.Reject(new Exception());

            Assert.AreEqual(p2.Join(), 101);
        }

        [TestMethod]
        public void ChainTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Resolve(x.ToString());
                return p2;
            });

            p1.Resolve(100);

            Assert.AreEqual(p3.Join(), "100");
        }

        [TestMethod]
        public void ChainFailTest() {
            var p1 = new Promise<int>();

            var p3 = p1.Chain(x => {
                var p2 = new Promise<string>();
                p2.Reject(new Exception("DIE!!!"));
                return p2;
            });

            p1.Resolve(100);

            Assert.IsTrue(p3.IsResolved);
        }

        [TestMethod]
        public void PoolTest() {
            var pid = Thread.CurrentThread.ManagedThreadId;
            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);

            Assert.AreNotEqual(pid, p.Join());
        }

        [TestMethod]
        public void WorkerPoolSizeTest() {
            var pool = new WorkerPool(5, 10, 1);

            Assert.AreEqual(5, pool.PoolSize);

            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });

            Assert.AreEqual(5, pool.PoolSize);

            for (int i = 0; i < 100; i++)
                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
            Thread.Sleep(200);
            Assert.AreEqual(10, pool.PoolSize);

            pool.Dispose();
        }

        [TestMethod]
        public void WorkerPoolCorrectTest() {
            var pool = new WorkerPool(0,1000,100);

            const int iterations = 1000;
            int pending = iterations;
            var stop = new ManualResetEvent(false);

            var count = 0;
            for (int i = 0; i < iterations; i++) {
                pool
                    .Invoke(() => 1)
                    .Then(x => Interlocked.Add(ref count, x))
                    .Then(x => Math.Log10(x))
                    .On(() => {
                        Interlocked.Decrement(ref pending);
                        if (pending == 0)
                            stop.Set();
                    }, PromiseEventType.All);
            }

            stop.WaitOne();

            Assert.AreEqual(iterations, count);
            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
            pool.Dispose();
            
        }

        [TestMethod]
        public void WorkerPoolDisposeTest() {
            var pool = new WorkerPool(5, 20);
            Assert.AreEqual(5, pool.PoolSize);
            pool.Dispose();
            Thread.Sleep(500);
            Assert.AreEqual(0, pool.PoolSize);
            pool.Dispose();
        }

        [TestMethod]
        public void MTQueueTest() {
            var queue = new MTQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            int writers = 0;
            int readers = 0;
            var stop = new ManualResetEvent(false);
            int total = 0;

            const int itemsPerWriter = 10000;
            const int writersCount = 10;

            for (int i = 0; i < writersCount; i++) {
                Interlocked.Increment(ref writers);
                AsyncPool
                    .RunThread(() => {
                        for (int ii = 0; ii < itemsPerWriter; ii++) {
                            queue.Enqueue(1);
                        }
                        return 1;
                    })
                    .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
            }

            for (int i = 0; i < 10; i++) {
                Interlocked.Increment(ref readers);
                AsyncPool
                    .RunThread(() => {
                        int t;
                        do {
                            while (queue.TryDequeue(out t))
                                Interlocked.Add(ref total, t);
                        } while (writers > 0);
                        return 1;
                    })
                    .On(() => {
                        Interlocked.Decrement(ref readers);
                        if (readers == 0)
                            stop.Set();
                    }, PromiseEventType.All);
            }

            stop.WaitOne();

            Assert.AreEqual(100000, total);
        }

        [TestMethod]
        public void AsyncQueueTest() {
            var queue = new AsyncQueue<int>();
            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            const int count = 10000000;

            int res1 = 0, res2 = 0;
            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    for (var i = 0; i < count; i++)
                        queue.Enqueue(1);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    for (var i = 0; i < count; i++)
                        queue.Enqueue(2);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    int temp;
                    int i = 0;
                    while (i < count)
                        if (queue.TryDequeue(out temp)) {
                            i++;
                            res1 += temp;
                        }
                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    int temp;
                    int i = 0;
                    while (i < count)
                        if (queue.TryDequeue(out temp)) {
                            i++;
                            res2 += temp;
                        }
                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(count * 3, res1 + res2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                res1,
                res2,
                res1 + res2,
                count
            );
        }

        [TestMethod]
        public void AsyncQueueBatchTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 29;
            const int wCount = 400000;
            const int total = wBatch * wCount * 2;
            const int summ = wBatch * wCount * 3;

            int r1 = 0, r2 = 0;
            const int rBatch = 111;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 2;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];

                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r1 += buffer[i];
                            Interlocked.Add(ref read, actual);
                        }
                    }

                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];

                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r2 += buffer[i];
                            Interlocked.Add(ref read, actual);
                        }
                    }

                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void AsyncQueueChunkDequeueTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 31;
            const int wCount = 200000;
            const int total = wBatch * wCount * 3;
            const int summ = wBatch * wCount * 6;

            int r1 = 0, r2 = 0;
            const int rBatch = 1024;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 2;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 3;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[rBatch];
                    int count = 1;
                    double avgchunk = 0;
                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r2 += buffer[i];
                            Interlocked.Add(ref read, actual);
                            avgchunk = avgchunk*(count-1)/count + actual/(double)count;
                            count ++;
                        }
                    }

                    Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void AsyncQueueDrainTest() {
            var queue = new AsyncQueue<int>();

            const int wBatch = 11;
            const int wCount = 200000;
            const int total = wBatch * wCount * 3;
            const int summ = wBatch * wCount * 3;

            int r1 = 0, r2 = 0;
            const int rBatch = 11;
            int read = 0;

            var t1 = Environment.TickCount;

            AsyncPool.RunThread(
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    for(int i =0; i < wCount * wBatch; i++)
                        queue.Enqueue(1);
                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                },
                () => {
                    var buffer = new int[wBatch];
                    for(int i = 0; i<wBatch; i++)
                        buffer[i] = 1;

                    for(int i =0; i < wCount; i++)
                        queue.EnqueueRange(buffer,0,wBatch);
                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
                },
                /*() => {
                    int temp;
                    int count = 0;
                    while (read < total)
                        if (queue.TryDequeue(out temp)) {
                            count++;
                            r1 += temp;
                            Interlocked.Increment(ref read);
                        }
                    Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
                },*/
                /*() => {
                    var buffer = new int[rBatch];
                    var count = 0;
                    while(read < total) {
                        int actual;
                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
                            for(int i=0; i< actual; i++)
                                r1 += buffer[i];
                            Interlocked.Add(ref read, actual);
                            count += actual;
                        }
                    }

                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
                },*/
                () => {
                    var count = 0;
                    while(read < total) {
                        var buffer = queue.Drain();
                        for(int i=0; i< buffer.Length; i++)
                            r1 += buffer[i];
                            Interlocked.Add(ref read, buffer.Length);
                        count += buffer.Length;
                    }
                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
                },
                () => {
                    var count = 0;
                    while(read < total) {
                        var buffer = queue.Drain();
                        for(int i=0; i< buffer.Length; i++)
                            r2 += buffer[i];
                        Interlocked.Add(ref read, buffer.Length);
                        count += buffer.Length;
                    }
                    Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
                }
            )
                .Bundle()
                .Join();

            Assert.AreEqual(summ , r1 + r2);

            Console.WriteLine(
                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
                Environment.TickCount - t1,
                r1,
                r2,
                r1 + r2,
                total
            );
        }

        [TestMethod]
        public void ParallelMapTest() {

            const int count = 100000;

            var args = new double[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = rand.NextDouble();

            var t = Environment.TickCount;
            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();

            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ChainedMapTest() {

            using (var pool = new WorkerPool()) {
                const int count = 10000;

                var args = new double[count];
                var rand = new Random();

                for (int i = 0; i < count; i++)
                    args[i] = rand.NextDouble();

                var t = Environment.TickCount;
                var res = args
                    .ChainedMap(
                        // Analysis disable once AccessToDisposedClosure
                        x => pool.Invoke(
                            () => Math.Sin(x * x)
                        ),
                        4
                    )
                    .Join();

                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);

                t = Environment.TickCount;
                for (int i = 0; i < count; i++)
                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
            }
        }

        [TestMethod]
        public void ParallelForEachTest() {

            const int count = 100000;

            var args = new int[count];
            var rand = new Random();

            for (int i = 0; i < count; i++)
                args[i] = (int)(rand.NextDouble() * 100);

            int result = 0;

            var t = Environment.TickCount;
            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();

            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);

            int result2 = 0;

            t = Environment.TickCount;
            for (int i = 0; i < count; i++)
                result2 += args[i];
            Assert.AreEqual(result2, result);
            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
        }

        [TestMethod]
        public void ComplexCase1Test() {
            var flags = new bool[3];

            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)

            var step1 = PromiseHelper
                .Sleep(200, "Alan")
                .On(() => flags[0] = true, PromiseEventType.Cancelled);
            var p = step1
                .Chain(x =>
                    PromiseHelper
                        .Sleep(200, "Hi, " + x)
                        .Then(y => y)
                        .On(() => flags[1] = true, PromiseEventType.Cancelled)
                )
                .On(() => flags[2] = true, PromiseEventType.Cancelled);
            step1.Join();
            p.Cancel();
            try {
                Assert.AreEqual(p.Join(), "Hi, Alan");
                Assert.Fail("Shouldn't get here");
            } catch (OperationCanceledException) {
            }

            Assert.IsFalse(flags[0]);
            Assert.IsTrue(flags[1]);
            Assert.IsTrue(flags[2]);
        }

        [TestMethod]
        public void ChainedCancel1Test() {
            // при отмене сцепленной асинхронной операции все обещание должно
            // завершаться ошибкой OperationCanceledException
            var p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Then(x => {
                    // запускаем две асинхронные операции
                    var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
                    // вторая операция отменяет первую до завершения
                    PromiseHelper
                        .Sleep(100, "HAL, STOP!")
                        .Then(result.Cancel);
                    return result;
                });
            try {
                p.Join();
            } catch (TargetInvocationException err) {
                Assert.IsTrue(err.InnerException is OperationCanceledException);
            }
        }

        [TestMethod]
        public void ChainedCancel2Test() {
            // при отмене цепочки обещаний, вложенные операции также должны отменяться
            var pSurvive = new Promise<bool>();
            var hemStarted = new ManualResetEvent(false);
            var p = PromiseHelper
                .Sleep(1, "Hi, HAL!")
                .Chain(x => {
                    hemStarted.Set();
                    // запускаем две асинхронные операции
                    var result = PromiseHelper
                        .Sleep(100000000, "HEM ENABLED!!!")
                        .Then(s => {
                            pSurvive.Resolve(false);
                            return s;
                        });

                    result
                        .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
                    
                    return result;
                });

            hemStarted.WaitOne();
            p.Cancel();

            try {
                p.Join();
            } catch (OperationCanceledException) {
                Assert.IsTrue(pSurvive.Join());
            }
        }

        [TestMethod]
        public void SharedLockTest() {
            var l = new SharedLock();
            int shared = 0;
            int exclusive = 0;
            var s1 = new Signal();
            var log = new AsyncQueue<string>();

            try {
                AsyncPool.RunThread(
                    () => {
                        log.Enqueue("Reader #1 started");
                        try {
                            l.LockShared();
                            log.Enqueue("Reader #1 lock got");
                            if (Interlocked.Increment(ref shared) == 2)
                                s1.Set();
                            s1.Wait();
                            log.Enqueue("Reader #1 finished");
                            Interlocked.Decrement(ref shared);
                        } finally {
                            l.Release();
                            log.Enqueue("Reader #1 lock released");
                        }
                    },
                    () => {
                        log.Enqueue("Reader #2 started");

                        try {
                            l.LockShared();
                            log.Enqueue("Reader #2 lock got");

                            if (Interlocked.Increment(ref shared) == 2)
                                s1.Set();
                            s1.Wait();
                            log.Enqueue("Reader #2 upgrading to writer");
                            Interlocked.Decrement(ref shared);
                            l.Upgrade();
                            log.Enqueue("Reader #2 upgraded");

                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
                            Assert.AreEqual(0, shared);
                            log.Enqueue("Reader #2 finished");
                            Interlocked.Decrement(ref exclusive);
                        } finally {
                            l.Release();
                            log.Enqueue("Reader #2 lock released");
                        }
                    },
                    () => {
                        log.Enqueue("Writer #1 started");
                        try {
                            l.LockExclusive();
                            log.Enqueue("Writer #1 got the lock");
                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
                            Interlocked.Decrement(ref exclusive);
                            log.Enqueue("Writer #1 is finished");
                        } finally {
                            l.Release();
                            log.Enqueue("Writer #1 lock released");
                        }
                    }
                ).Bundle().Join(1000);
                log.Enqueue("Done");
            } catch(Exception error) {
                log.Enqueue(error.Message);
                throw;
            } finally {
                foreach (var m in log)
                    Console.WriteLine(m);
            }
        }
    }
}