diff Implab.Test/AsyncTests.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents ec91a6dfa5b3
children 8200ab154c8a
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Wed Sep 03 18:34:02 2014 +0400
+++ b/Implab.Test/AsyncTests.cs	Fri Apr 22 13:10:34 2016 +0300
@@ -1,386 +1,863 @@
-using System;
-using Microsoft.VisualStudio.TestTools.UnitTesting;
-using System.Reflection;
-using System.Threading;
-using Implab.Parallels;
-
-namespace Implab.Test {
-    [TestClass]
-    public class AsyncTests {
-        [TestMethod]
-        public void ResolveTest() {
-            int res = -1;
-            var p = new Promise<int>();
-            p.Then(x => res = x);
-            p.Resolve(100);
-
-            Assert.AreEqual(100, res);
-        }
-
-        [TestMethod]
-        public void RejectTest() {
-            int res = -1;
-            Exception err = null;
-
-            var p = new Promise<int>();
-            p.Then(x => res = x, e => err = e);
-            p.Reject(new ApplicationException("error"));
-
-            Assert.AreEqual(res, -1);
-            Assert.AreEqual(err.Message, "error");
-
-        }
-
-        [TestMethod]
-        public void JoinSuccessTest() {
-            var p = new Promise<int>();
-            p.Resolve(100);
-            Assert.AreEqual(p.Join(), 100);
-        }
-
-        [TestMethod]
-        public void JoinFailTest() {
-            var p = new Promise<int>();
-            p.Reject(new ApplicationException("failed"));
-
-            try {
-                p.Join();
-                throw new ApplicationException("WRONG!");
-            } catch (TargetInvocationException err) {
-                Assert.AreEqual(err.InnerException.Message, "failed");
-            } catch {
-                Assert.Fail("Got wrong excaption");
-            }
-        }
-
-        [TestMethod]
-        public void MapTest() {
-            var p = new Promise<int>();
-
-            var p2 = p.Map(x => x.ToString());
-            p.Resolve(100);
-
-            Assert.AreEqual(p2.Join(), "100");
-        }
-
-        [TestMethod]
-        public void FixErrorTest() {
-            var p = new Promise<int>();
-
-            var p2 = p.Error(e => 101);
-
-            p.Reject(new Exception());
-
-            Assert.AreEqual(p2.Join(), 101);
-        }
-
-        [TestMethod]
-        public void ChainTest() {
-            var p1 = new Promise<int>();
-
-            var p3 = p1.Chain(x => {
-                var p2 = new Promise<string>();
-                p2.Resolve(x.ToString());
-                return p2;
-            });
-
-            p1.Resolve(100);
-
-            Assert.AreEqual(p3.Join(), "100");
-        }
-
-        [TestMethod]
-        public void PoolTest() {
-            var pid = Thread.CurrentThread.ManagedThreadId;
-            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
-
-            Assert.AreNotEqual(pid, p.Join());
-        }
-
-        [TestMethod]
-        public void WorkerPoolSizeTest() {
-            var pool = new WorkerPool(5, 10, 0);
-
-            Assert.AreEqual(5, pool.PoolSize);
-
-            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
-
-            Assert.AreEqual(5, pool.PoolSize);
-
-            for (int i = 0; i < 100; i++)
-                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
-            Thread.Sleep(200);
-            Assert.AreEqual(10, pool.PoolSize);
-
-            pool.Dispose();
-        }
-
-        [TestMethod]
-        public void WorkerPoolCorrectTest() {
-            var pool = new WorkerPool(0,1000,100);
-
-            int iterations = 1000;
-            int pending = iterations;
-            var stop = new ManualResetEvent(false);
-
-            var count = 0;
-            for (int i = 0; i < iterations; i++) {
-                pool
-                    .Invoke(() => 1)
-                    .Then(x => Interlocked.Add(ref count, x))
-                    .Then(x => Math.Log10(x))
-                    .Anyway(() => {
-                        Interlocked.Decrement(ref pending);
-                        if (pending == 0)
-                            stop.Set();
-                    });
-            }
-
-            stop.WaitOne();
-
-            Assert.AreEqual(iterations, count);
-            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
-            pool.Dispose();
-            
-        }
-
-        [TestMethod]
-        public void WorkerPoolDisposeTest() {
-            var pool = new WorkerPool(5, 20);
-            Assert.AreEqual(5, pool.PoolSize);
-            pool.Dispose();
-            Thread.Sleep(500);
-            Assert.AreEqual(0, pool.PoolSize);
-            pool.Dispose();
-        }
-
-        [TestMethod]
-        public void MTQueueTest() {
-            var queue = new MTQueue<int>();
-            int res;
-
-            queue.Enqueue(10);
-            Assert.IsTrue(queue.TryDequeue(out res));
-            Assert.AreEqual(10, res);
-            Assert.IsFalse(queue.TryDequeue(out res));
-
-            for (int i = 0; i < 1000; i++)
-                queue.Enqueue(i);
-
-            for (int i = 0; i < 1000; i++) {
-                queue.TryDequeue(out res);
-                Assert.AreEqual(i, res);
-            }
-
-            int writers = 0;
-            int readers = 0;
-            var stop = new ManualResetEvent(false);
-            int total = 0;
-
-            int itemsPerWriter = 1000;
-            int writersCount = 3;
-
-            for (int i = 0; i < writersCount; i++) {
-                Interlocked.Increment(ref writers);
-                var wn = i;
-                AsyncPool
-                    .InvokeNewThread(() => {
-                        for (int ii = 0; ii < itemsPerWriter; ii++) {
-                            queue.Enqueue(1);
-                        }
-                        return 1;
-                    })
-                    .Anyway(() => Interlocked.Decrement(ref writers));
-            }
-
-            for (int i = 0; i < 10; i++) {
-                Interlocked.Increment(ref readers);
-                var wn = i;
-                AsyncPool
-                    .InvokeNewThread(() => {
-                        int t;
-                        do {
-                            while (queue.TryDequeue(out t))
-                                Interlocked.Add(ref total, t);
-                        } while (writers > 0);
-                        return 1;
-                    })
-                    .Anyway(() => {
-                        Interlocked.Decrement(ref readers);
-                        if (readers == 0)
-                            stop.Set();
-                    });
-            }
-
-            stop.WaitOne();
-
-            Assert.AreEqual(itemsPerWriter * writersCount, total);
-        }
-
-        [TestMethod]
-        public void ParallelMapTest() {
-
-            int count = 100000;
-
-            double[] args = new double[count];
-            var rand = new Random();
-
-            for (int i = 0; i < count; i++)
-                args[i] = rand.NextDouble();
-
-            var t = Environment.TickCount;
-            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
-
-            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
-
-            t = Environment.TickCount;
-            for (int i = 0; i < count; i++)
-                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
-            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
-        }
-
-        [TestMethod]
-        public void ChainedMapTest() {
-
-            using (var pool = new WorkerPool(0,100,100)) {
-                int count = 10000;
-
-                double[] args = new double[count];
-                var rand = new Random();
-
-                for (int i = 0; i < count; i++)
-                    args[i] = rand.NextDouble();
-
-                var t = Environment.TickCount;
-                var res = args
-                    .ChainedMap(
-                        x => pool.Invoke(
-                            () => Math.Sin(x * x)
-                        ),
-                        4
-                    )
-                    .Join();
-
-                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
-
-                t = Environment.TickCount;
-                for (int i = 0; i < count; i++)
-                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
-                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
-                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
-            }
-        }
-
-        [TestMethod]
-        public void ParallelForEachTest() {
-
-            int count = 100000;
-
-            int[] args = new int[count];
-            var rand = new Random();
-
-            for (int i = 0; i < count; i++)
-                args[i] = (int)(rand.NextDouble() * 100);
-
-            int result = 0;
-
-            var t = Environment.TickCount;
-            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
-
-            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
-
-            int result2 = 0;
-
-            t = Environment.TickCount;
-            for (int i = 0; i < count; i++)
-                result2 += args[i];
-            Assert.AreEqual(result2, result);
-            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
-        }
-
-        [TestMethod]
-        public void ComplexCase1Test() {
-            var flags = new bool[3];
-
-            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
-
-            var p = PromiseHelper
-                .Sleep(200, "Alan")
-                .Cancelled(() => flags[0] = true)
-                .Chain(x =>
-                    PromiseHelper
-                        .Sleep(200, "Hi, " + x)
-                        .Map(y => y)
-                        .Cancelled(() => flags[1] = true)
-                )
-                .Cancelled(() => flags[2] = true);
-            Thread.Sleep(300);
-            p.Cancel();
-            try {
-                Assert.AreEqual(p.Join(), "Hi, Alan");
-                Assert.Fail("Shouldn't get here");
-            } catch (OperationCanceledException) {
-            }
-
-            Assert.IsFalse(flags[0]);
-            Assert.IsTrue(flags[1]);
-            Assert.IsTrue(flags[2]);
-        }
-
-        [TestMethod]
-        public void ChainedCancel1Test() {
-            //        
-            //   OperationCanceledException
-            var p = PromiseHelper
-                .Sleep(1, "Hi, HAL!")
-                .Chain(x => {
-                    //    
-                    var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
-                    //      
-                    PromiseHelper
-                        .Sleep(100, "HAL, STOP!")
-                        .Then(() => result.Cancel());
-                    return result;
-                });
-            try {
-                p.Join();
-            } catch (TargetInvocationException err) {
-                Assert.IsTrue(err.InnerException is OperationCanceledException);
-            }
-        }
-
-        [TestMethod]
-        public void ChainedCancel2Test() {
-            //    ,     
-            IPromiseBase p = null;
-            var pSurvive = new Promise<bool>();
-            var hemStarted = new ManualResetEvent(false);
-            p = PromiseHelper
-                .Sleep(1, "Hi, HAL!")
-                .Chain(x => {
-                    hemStarted.Set();
-                    //    
-                    var result = PromiseHelper
-                        .Sleep(1000, "HEM ENABLED!!!")
-                        .Then(s => pSurvive.Resolve(false));
-
-                    result
-                        .Cancelled(() => pSurvive.Resolve(true));
-                    
-                    return result;
-                });
-
-            hemStarted.WaitOne();
-            p.Cancel();
-
-            try {
-                p.Join();
-            } catch (OperationCanceledException) {
-                Assert.IsTrue(pSurvive.Join());
-            }
-        }
-    }
-}
-
+using System;
+using System.Reflection;
+using System.Threading;
+using Implab.Parallels;
+
+#if MONO
+
+using NUnit.Framework;
+using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
+using TestMethodAttribute = NUnit.Framework.TestAttribute;
+
+#else
+
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+#endif
+
+namespace Implab.Test {
+    [TestClass]
+    public class AsyncTests {
+        [TestMethod]
+        public void ResolveTest() {
+            int res = -1;
+            var p = new Promise<int>();
+            p.Then(x => res = x);
+            p.Resolve(100);
+
+            Assert.AreEqual(100, res);
+        }
+
+        [TestMethod]
+        public void RejectTest() {
+            int res = -1;
+            Exception err = null;
+
+            var p = new Promise<int>();
+            p.Then(
+                x => res = x,
+                e => {
+                    err = e;
+                    return -2;
+                }
+            );
+            p.Reject(new ApplicationException("error"));
+
+            Assert.AreEqual(res, -1);
+            Assert.AreEqual(err.Message, "error");
+
+        }
+
+        [TestMethod]
+        public void CancelExceptionTest() {
+            var p = new Promise<bool>();
+            p.CancelOperation(null);
+
+            var p2 = p.Then(x => x, null, reason => {
+                throw new ApplicationException("CANCELLED"); 
+            });
+
+            try {
+                p2.Join();
+                Assert.Fail();
+            } catch (ApplicationException err) {
+                Assert.AreEqual("CANCELLED", err.InnerException.Message);
+            }
+
+        }
+
+        [TestMethod]
+        public void ContinueOnCancelTest() {
+            var p = new Promise<bool>();
+            p.CancelOperation(null);
+
+            var p2 = p
+                .Then(x => x, null, reason => {
+                    throw new ApplicationException("CANCELLED");
+                })
+                .Then(x => x, e => true);
+
+            Assert.AreEqual(true, p2.Join());
+        }
+
+        [TestMethod]
+        public void JoinSuccessTest() {
+            var p = new Promise<int>();
+            p.Resolve(100);
+            Assert.AreEqual(p.Join(), 100);
+        }
+
+        [TestMethod]
+        public void JoinFailTest() {
+            var p = new Promise<int>();
+            p.Reject(new ApplicationException("failed"));
+
+            try {
+                p.Join();
+                throw new ApplicationException("WRONG!");
+            } catch (TargetInvocationException err) {
+                Assert.AreEqual(err.InnerException.Message, "failed");
+            } catch {
+                Assert.Fail("Got wrong excaption");
+            }
+        }
+
+        [TestMethod]
+        public void MapTest() {
+            var p = new Promise<int>();
+
+            var p2 = p.Then(x => x.ToString());
+            p.Resolve(100);
+
+            Assert.AreEqual(p2.Join(), "100");
+        }
+
+        [TestMethod]
+        public void FixErrorTest() {
+            var p = new Promise<int>();
+
+            var p2 = p.Then(x => x, e => 101);
+
+            p.Reject(new Exception());
+
+            Assert.AreEqual(p2.Join(), 101);
+        }
+
+        [TestMethod]
+        public void ChainTest() {
+            var p1 = new Promise<int>();
+
+            var p3 = p1.Chain(x => {
+                var p2 = new Promise<string>();
+                p2.Resolve(x.ToString());
+                return p2;
+            });
+
+            p1.Resolve(100);
+
+            Assert.AreEqual(p3.Join(), "100");
+        }
+
+        [TestMethod]
+        public void ChainFailTest() {
+            var p1 = new Promise<int>();
+
+            var p3 = p1.Chain(x => {
+                var p2 = new Promise<string>();
+                p2.Reject(new Exception("DIE!!!"));
+                return p2;
+            });
+
+            p1.Resolve(100);
+
+            Assert.IsTrue(p3.IsResolved);
+        }
+
+        [TestMethod]
+        public void PoolTest() {
+            var pid = Thread.CurrentThread.ManagedThreadId;
+            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
+
+            Assert.AreNotEqual(pid, p.Join());
+        }
+
+        [TestMethod]
+        public void WorkerPoolSizeTest() {
+            var pool = new WorkerPool(5, 10, 1);
+
+            Assert.AreEqual(5, pool.PoolSize);
+
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+
+            Assert.AreEqual(5, pool.PoolSize);
+
+            for (int i = 0; i < 100; i++)
+                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            Thread.Sleep(200);
+            Assert.AreEqual(10, pool.PoolSize);
+
+            pool.Dispose();
+        }
+
+        [TestMethod]
+        public void WorkerPoolCorrectTest() {
+            var pool = new WorkerPool(0,1000,100);
+
+            const int iterations = 1000;
+            int pending = iterations;
+            var stop = new ManualResetEvent(false);
+
+            var count = 0;
+            for (int i = 0; i < iterations; i++) {
+                pool
+                    .Invoke(() => 1)
+                    .Then(x => Interlocked.Add(ref count, x))
+                    .Then(x => Math.Log10(x))
+                    .On(() => {
+                        Interlocked.Decrement(ref pending);
+                        if (pending == 0)
+                            stop.Set();
+                    }, PromiseEventType.All);
+            }
+
+            stop.WaitOne();
+
+            Assert.AreEqual(iterations, count);
+            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
+            pool.Dispose();
+            
+        }
+
+        [TestMethod]
+        public void WorkerPoolDisposeTest() {
+            var pool = new WorkerPool(5, 20);
+            Assert.AreEqual(5, pool.PoolSize);
+            pool.Dispose();
+            Thread.Sleep(500);
+            Assert.AreEqual(0, pool.PoolSize);
+            pool.Dispose();
+        }
+
+        [TestMethod]
+        public void MTQueueTest() {
+            var queue = new MTQueue<int>();
+            int res;
+
+            queue.Enqueue(10);
+            Assert.IsTrue(queue.TryDequeue(out res));
+            Assert.AreEqual(10, res);
+            Assert.IsFalse(queue.TryDequeue(out res));
+
+            for (int i = 0; i < 1000; i++)
+                queue.Enqueue(i);
+
+            for (int i = 0; i < 1000; i++) {
+                queue.TryDequeue(out res);
+                Assert.AreEqual(i, res);
+            }
+
+            int writers = 0;
+            int readers = 0;
+            var stop = new ManualResetEvent(false);
+            int total = 0;
+
+            const int itemsPerWriter = 10000;
+            const int writersCount = 10;
+
+            for (int i = 0; i < writersCount; i++) {
+                Interlocked.Increment(ref writers);
+                AsyncPool
+                    .RunThread(() => {
+                        for (int ii = 0; ii < itemsPerWriter; ii++) {
+                            queue.Enqueue(1);
+                        }
+                        return 1;
+                    })
+                    .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
+            }
+
+            for (int i = 0; i < 10; i++) {
+                Interlocked.Increment(ref readers);
+                AsyncPool
+                    .RunThread(() => {
+                        int t;
+                        do {
+                            while (queue.TryDequeue(out t))
+                                Interlocked.Add(ref total, t);
+                        } while (writers > 0);
+                        return 1;
+                    })
+                    .On(() => {
+                        Interlocked.Decrement(ref readers);
+                        if (readers == 0)
+                            stop.Set();
+                    }, PromiseEventType.All);
+            }
+
+            stop.WaitOne();
+
+            Assert.AreEqual(100000, total);
+        }
+
+        [TestMethod]
+        public void AsyncQueueTest() {
+            var queue = new AsyncQueue<int>();
+            int res;
+
+            queue.Enqueue(10);
+            Assert.IsTrue(queue.TryDequeue(out res));
+            Assert.AreEqual(10, res);
+            Assert.IsFalse(queue.TryDequeue(out res));
+
+            for (int i = 0; i < 1000; i++)
+                queue.Enqueue(i);
+
+            for (int i = 0; i < 1000; i++) {
+                queue.TryDequeue(out res);
+                Assert.AreEqual(i, res);
+            }
+
+            const int count = 10000000;
+
+            int res1 = 0, res2 = 0;
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    for (var i = 0; i < count; i++)
+                        queue.Enqueue(1);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    for (var i = 0; i < count; i++)
+                        queue.Enqueue(2);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    int temp;
+                    int i = 0;
+                    while (i < count)
+                        if (queue.TryDequeue(out temp)) {
+                            i++;
+                            res1 += temp;
+                        }
+                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    int temp;
+                    int i = 0;
+                    while (i < count)
+                        if (queue.TryDequeue(out temp)) {
+                            i++;
+                            res2 += temp;
+                        }
+                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
+                }
+            )
+                .Bundle()
+                .Join();
+
+            Assert.AreEqual(count * 3, res1 + res2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                res1,
+                res2,
+                res1 + res2,
+                count
+            );
+        }
+
+        [TestMethod]
+        public void AsyncQueueBatchTest() {
+            var queue = new AsyncQueue<int>();
+
+            const int wBatch = 29;
+            const int wCount = 400000;
+            const int total = wBatch * wCount * 2;
+            const int summ = wBatch * wCount * 3;
+
+            int r1 = 0, r2 = 0;
+            const int rBatch = 111;
+            int read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 2;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[rBatch];
+
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r1 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                        }
+                    }
+
+                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[rBatch];
+
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r2 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                        }
+                    }
+
+                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
+                }
+            )
+                .Bundle()
+                .Join();
+
+            Assert.AreEqual(summ , r1 + r2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2,
+                total
+            );
+        }
+
+        [TestMethod]
+        public void AsyncQueueChunkDequeueTest() {
+            var queue = new AsyncQueue<int>();
+
+            const int wBatch = 31;
+            const int wCount = 200000;
+            const int total = wBatch * wCount * 3;
+            const int summ = wBatch * wCount * 6;
+
+            int r1 = 0, r2 = 0;
+            const int rBatch = 1024;
+            int read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 2;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 3;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[rBatch];
+                    int count = 1;
+                    double avgchunk = 0;
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r2 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                            avgchunk = avgchunk*(count-1)/count + actual/(double)count;
+                            count ++;
+                        }
+                    }
+
+                    Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
+                }
+            )
+                .Bundle()
+                .Join();
+
+            Assert.AreEqual(summ , r1 + r2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2,
+                total
+            );
+        }
+
+        [TestMethod]
+        public void AsyncQueueDrainTest() {
+            var queue = new AsyncQueue<int>();
+
+            const int wBatch = 11;
+            const int wCount = 200000;
+            const int total = wBatch * wCount * 3;
+            const int summ = wBatch * wCount * 3;
+
+            int r1 = 0, r2 = 0;
+            const int rBatch = 11;
+            int read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    for(int i =0; i < wCount * wBatch; i++)
+                        queue.Enqueue(1);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
+                },
+                /*() => {
+                    int temp;
+                    int count = 0;
+                    while (read < total)
+                        if (queue.TryDequeue(out temp)) {
+                            count++;
+                            r1 += temp;
+                            Interlocked.Increment(ref read);
+                        }
+                    Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
+                },*/
+                /*() => {
+                    var buffer = new int[rBatch];
+                    var count = 0;
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r1 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                            count += actual;
+                        }
+                    }
+
+                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+                },*/
+                () => {
+                    var count = 0;
+                    while(read < total) {
+                        var buffer = queue.Drain();
+                        for(int i=0; i< buffer.Length; i++)
+                            r1 += buffer[i];
+                            Interlocked.Add(ref read, buffer.Length);
+                        count += buffer.Length;
+                    }
+                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+                },
+                () => {
+                    var count = 0;
+                    while(read < total) {
+                        var buffer = queue.Drain();
+                        for(int i=0; i< buffer.Length; i++)
+                            r2 += buffer[i];
+                        Interlocked.Add(ref read, buffer.Length);
+                        count += buffer.Length;
+                    }
+                    Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
+                }
+            )
+                .Bundle()
+                .Join();
+
+            Assert.AreEqual(summ , r1 + r2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2,
+                total
+            );
+        }
+
+        [TestMethod]
+        public void ParallelMapTest() {
+
+            const int count = 100000;
+
+            var args = new double[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = rand.NextDouble();
+
+            var t = Environment.TickCount;
+            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
+
+            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
+
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
+
+        [TestMethod]
+        public void ChainedMapTest() {
+
+            using (var pool = new WorkerPool()) {
+                const int count = 10000;
+
+                var args = new double[count];
+                var rand = new Random();
+
+                for (int i = 0; i < count; i++)
+                    args[i] = rand.NextDouble();
+
+                var t = Environment.TickCount;
+                var res = args
+                    .ChainedMap(
+                        // Analysis disable once AccessToDisposedClosure
+                        x => pool.Invoke(
+                            () => Math.Sin(x * x)
+                        ),
+                        4
+                    )
+                    .Join();
+
+                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
+
+                t = Environment.TickCount;
+                for (int i = 0; i < count; i++)
+                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
+                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
+            }
+        }
+
+        [TestMethod]
+        public void ParallelForEachTest() {
+
+            const int count = 100000;
+
+            var args = new int[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = (int)(rand.NextDouble() * 100);
+
+            int result = 0;
+
+            var t = Environment.TickCount;
+            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
+
+            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
+
+            int result2 = 0;
+
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                result2 += args[i];
+            Assert.AreEqual(result2, result);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
+
+        [TestMethod]
+        public void ComplexCase1Test() {
+            var flags = new bool[3];
+
+            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
+
+            var step1 = PromiseHelper
+                .Sleep(200, "Alan")
+                .On(() => flags[0] = true, PromiseEventType.Cancelled);
+            var p = step1
+                .Chain(x =>
+                    PromiseHelper
+                        .Sleep(200, "Hi, " + x)
+                        .Then(y => y)
+                        .On(() => flags[1] = true, PromiseEventType.Cancelled)
+                )
+                .On(() => flags[2] = true, PromiseEventType.Cancelled);
+            step1.Join();
+            p.Cancel();
+            try {
+                Assert.AreEqual(p.Join(), "Hi, Alan");
+                Assert.Fail("Shouldn't get here");
+            } catch (OperationCanceledException) {
+            }
+
+            Assert.IsFalse(flags[0]);
+            Assert.IsTrue(flags[1]);
+            Assert.IsTrue(flags[2]);
+        }
+
+        [TestMethod]
+        public void ChainedCancel1Test() {
+            // при отмене сцепленной асинхронной операции все обещание должно
+            // завершаться ошибкой OperationCanceledException
+            var p = PromiseHelper
+                .Sleep(1, "Hi, HAL!")
+                .Then(x => {
+                    // запускаем две асинхронные операции
+                    var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
+                    // вторая операция отменяет первую до завершения
+                    PromiseHelper
+                        .Sleep(100, "HAL, STOP!")
+                        .Then(result.Cancel);
+                    return result;
+                });
+            try {
+                p.Join();
+            } catch (TargetInvocationException err) {
+                Assert.IsTrue(err.InnerException is OperationCanceledException);
+            }
+        }
+
+        [TestMethod]
+        public void ChainedCancel2Test() {
+            // при отмене цепочки обещаний, вложенные операции также должны отменяться
+            var pSurvive = new Promise<bool>();
+            var hemStarted = new Signal();
+            var p = PromiseHelper
+                .Sleep(1, "Hi, HAL!")
+                .Chain(() => {
+                    hemStarted.Set();
+                    // запускаем две асинхронные операции
+                    var result = PromiseHelper
+                        .Sleep(2000, "HEM ENABLED!!!")
+                        .Then(() => pSurvive.Resolve(false));
+
+                    result
+                        .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
+
+                    return result;
+                });
+
+            hemStarted.Wait();
+            p.Cancel();
+
+            try {
+                p.Join();
+                Assert.Fail();
+            } catch (OperationCanceledException) {
+            }
+            Assert.IsTrue(pSurvive.Join());
+        }
+
+        [TestMethod]
+        public void SharedLockTest() {
+            var l = new SharedLock();
+            int shared = 0;
+            int exclusive = 0;
+            var s1 = new Signal();
+            var log = new AsyncQueue<string>();
+
+            try {
+                AsyncPool.RunThread(
+                    () => {
+                        log.Enqueue("Reader #1 started");
+                        try {
+                            l.LockShared();
+                            log.Enqueue("Reader #1 lock got");
+                            if (Interlocked.Increment(ref shared) == 2)
+                                s1.Set();
+                            s1.Wait();
+                            log.Enqueue("Reader #1 finished");
+                            Interlocked.Decrement(ref shared);
+                        } finally {
+                            l.Release();
+                            log.Enqueue("Reader #1 lock released");
+                        }
+                    },
+                    () => {
+                        log.Enqueue("Reader #2 started");
+
+                        try {
+                            l.LockShared();
+                            log.Enqueue("Reader #2 lock got");
+
+                            if (Interlocked.Increment(ref shared) == 2)
+                                s1.Set();
+                            s1.Wait();
+                            log.Enqueue("Reader #2 upgrading to writer");
+                            Interlocked.Decrement(ref shared);
+                            l.Upgrade();
+                            log.Enqueue("Reader #2 upgraded");
+
+                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
+                            Assert.AreEqual(0, shared);
+                            log.Enqueue("Reader #2 finished");
+                            Interlocked.Decrement(ref exclusive);
+                        } finally {
+                            l.Release();
+                            log.Enqueue("Reader #2 lock released");
+                        }
+                    },
+                    () => {
+                        log.Enqueue("Writer #1 started");
+                        try {
+                            l.LockExclusive();
+                            log.Enqueue("Writer #1 got the lock");
+                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
+                            Interlocked.Decrement(ref exclusive);
+                            log.Enqueue("Writer #1 is finished");
+                        } finally {
+                            l.Release();
+                            log.Enqueue("Writer #1 lock released");
+                        }
+                    }
+                ).Bundle().Join(1000);
+                log.Enqueue("Done");
+            } catch(Exception error) {
+                log.Enqueue(error.Message);
+                throw;
+            } finally {
+                foreach (var m in log)
+                    Console.WriteLine(m);
+            }
+        }
+
+        #if NET_4_5
+
+        [TestMethod]
+        public async void TaskInteropTest() {
+            var promise = new Promise<int>();
+            promise.Resolve(10);
+            var res = await promise;
+
+            Assert.AreEqual(10, res);
+        }
+
+        #endif
+    }
+}
+