view Implab.Test/AsyncTests.cs @ 14:e943453e5039 promises

Implemented interllocked queue fixed promise syncronization
author cin
date Wed, 06 Nov 2013 17:49:12 +0400
parents b0feb5b9ad1c
children 0f982f9b7d4d
line wrap: on
line source

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Reflection;
using System.Threading;
using Implab.Parallels;

namespace Implab.Test
{
	[TestClass]
	public class AsyncTests
	{
		[TestMethod]
		public void ResolveTest ()
		{
			int res = -1;
			var p = new Promise<int> ();
			p.Then (x => res = x);
			p.Resolve (100);

			Assert.AreEqual (res, 100);
		}

        [TestMethod]
		public void RejectTest ()
		{
			int res = -1;
			Exception err = null;

			var p = new Promise<int> ();
			p.Then (x => res = x, e => err = e);
			p.Reject (new ApplicationException ("error"));

			Assert.AreEqual (res, -1);
			Assert.AreEqual (err.Message, "error");

		}

        [TestMethod]
		public void JoinSuccessTest ()
		{
			var p = new Promise<int> ();
			p.Resolve (100);
			Assert.AreEqual (p.Join (), 100);
		}

        [TestMethod]
		public void JoinFailTest ()
		{
			var p = new Promise<int> ();
			p.Reject (new ApplicationException ("failed"));

			try {
				p.Join ();
				throw new ApplicationException ("WRONG!");
			} catch (TargetInvocationException err) {
				Assert.AreEqual (err.InnerException.Message, "failed");
			} catch {
				Assert.Fail ("Got wrong excaption");
			}
		}

        [TestMethod]
		public void MapTest ()
		{
			var p = new Promise<int> ();

			var p2 = p.Map (x => x.ToString ());
			p.Resolve (100);

			Assert.AreEqual (p2.Join (), "100");
		}

        [TestMethod]
        public void FixErrorTest() {
            var p = new Promise<int>();

            var p2 = p.Error(e => 101);

            p.Reject(new Exception());

            Assert.AreEqual(p2.Join(), 101);
        }

        [TestMethod]
		public void ChainTest ()
		{
			var p1 = new Promise<int> ();

			var p3 = p1.Chain (x => {
				var p2 = new Promise<string> ();
				p2.Resolve (x.ToString ());
				return p2;
			});

			p1.Resolve (100);

			Assert.AreEqual (p3.Join (), "100");
		}

        [TestMethod]
		public void PoolTest ()
		{
			var pid = Thread.CurrentThread.ManagedThreadId;
			var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);

			Assert.AreNotEqual (pid, p.Join ());
		}

        [TestMethod]
        public void WorkerPoolSizeTest() {
            var pool = new WorkerPool(5,10);

            Assert.AreEqual(5, pool.ThreadCount);

            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
            pool.Invoke(() => { Thread.Sleep(1000); return 10; });
            pool.Invoke(() => { Thread.Sleep(1000); return 10; });

            Assert.AreEqual(5, pool.ThreadCount);

            for (int i = 0; i < 100; i++)
                pool.Invoke(() => { Thread.Sleep(1000); return 10; });
            Assert.AreEqual(10, pool.ThreadCount);
        }

        [TestMethod]
        public void WorkerPoolCorrectTest() {
            var pool = new WorkerPool(5, 20);

            var count = 0;
            for (int i = 0; i < 1000; i++)
                pool
                    .Invoke(() => 1)
                    .Then(x => Interlocked.Add(ref count, x));

            Assert.AreEqual(1000, count);
        }

        [TestMethod]
        public void MTQueueTest() {
            var queue = new MTQueue<int>();
            var pool = new WorkerPool(5, 20);

            int res;

            queue.Enqueue(10);
            Assert.IsTrue(queue.TryDequeue(out res));
            Assert.AreEqual(10, res);
            Assert.IsFalse(queue.TryDequeue(out res));

            for (int i = 0; i < 1000; i++)
                queue.Enqueue(i);

            for (int i = 0; i < 1000; i++) {
                queue.TryDequeue(out res);
                Assert.AreEqual(i, res);
            }

            int writers = 0;
            int readers = 0;
            var stop = new ManualResetEvent(false);
            int total = 0;

            int itemsPerWriter = 1000;
            int writersCount = 3;

            for (int i = 0; i < writersCount; i++) {
                Interlocked.Increment(ref writers);
                var wn = i;
                AsyncPool
                    .InvokeNewThread(() => {
                        Console.WriteLine("Started writer: {0}", wn);
                        for (int ii = 0; ii < itemsPerWriter; ii++) {
                            queue.Enqueue(1);
                            Thread.Sleep(1);
                        }
                        Console.WriteLine("Stopped writer: {0}", wn);
                        return 1;
                    })
                    .Then(x => Interlocked.Decrement(ref writers) );
            }
            
            for (int i = 0; i < 10; i++) {
                Interlocked.Increment(ref readers);
                var wn = i;
                AsyncPool
                    .InvokeNewThread(() => {
                        int t;
                        Console.WriteLine("Started reader: {0}", wn);
                        do {
                            while (queue.TryDequeue(out t))
                                Interlocked.Add(ref total, t);
                            Thread.Sleep(0);
                        } while (writers > 0);
                        Console.WriteLine("Stopped reader: {0}", wn);
                        return 1;
                    })
                    .Then(x => {
                        Interlocked.Decrement(ref readers);
                        if (readers == 0)
                            stop.Set();
                    });
            }

            stop.WaitOne();

            Assert.AreEqual(itemsPerWriter * writersCount, total);
        }

        [TestMethod]
        public void ComplexCase1Test() {
            var flags = new bool[3];

            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)

            var p = PromiseHelper
                .Sleep(200, "Alan")
                .Cancelled(() => flags[0] = true)
                .Chain(x =>
                    PromiseHelper
                        .Sleep(200, "Hi, " + x)
                        .Map( y => y )
                        .Cancelled(() => flags[1] = true)
                )
                .Cancelled(() => flags[2] = true);
            Thread.Sleep(300);
            p.Cancel();
            try {
                Assert.AreEqual(p.Join(), "Hi, Alan");
                Assert.Fail("Shouldn't get here");
            } catch(OperationCanceledException) {
            }

            Assert.IsFalse(flags[0]);
            Assert.IsTrue(flags[1]);
            Assert.IsTrue(flags[2]);
        }
	}
}