Mercurial > pub > ImplabNet
view Implab.Test/AsyncTests.cs @ 149:eb793fbbe4ea v2
fixed promises cancellation
author | cin |
---|---|
date | Wed, 06 May 2015 17:11:27 +0300 |
parents | 706fccb85524 |
children | ec91a6dfa5b3 |
line wrap: on
line source
using System; using System.Reflection; using System.Threading; using Implab.Parallels; #if MONO using NUnit.Framework; using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; using TestMethodAttribute = NUnit.Framework.TestAttribute; #else using Microsoft.VisualStudio.TestTools.UnitTesting; #endif namespace Implab.Test { [TestClass] public class AsyncTests { [TestMethod] public void ResolveTest() { int res = -1; var p = new Promise<int>(); p.Then(x => res = x); p.Resolve(100); Assert.AreEqual(100, res); } [TestMethod] public void RejectTest() { int res = -1; Exception err = null; var p = new Promise<int>(); p.Then( x => res = x, e => { err = e; return -2; } ); p.Reject(new ApplicationException("error")); Assert.AreEqual(res, -1); Assert.AreEqual(err.Message, "error"); } [TestMethod] public void CancelExceptionTest() { var p = new Promise<bool>(); p.CancelOperation(null); var p2 = p.Then(x => x, null, reason => { throw new ApplicationException("CANCELLED"); }); try { p2.Join(); Assert.Fail(); } catch (ApplicationException err) { Assert.AreEqual("CANCELLED", err.InnerException.Message); } } [TestMethod] public void ContinueOnCancelTest() { var p = new Promise<bool>(); p.CancelOperation(null); var p2 = p .Then(x => x, null, reason => { throw new ApplicationException("CANCELLED"); }) .Then(x => x, e => true); Assert.AreEqual(true, p2.Join()); } [TestMethod] public void JoinSuccessTest() { var p = new Promise<int>(); p.Resolve(100); Assert.AreEqual(p.Join(), 100); } [TestMethod] public void JoinFailTest() { var p = new Promise<int>(); p.Reject(new ApplicationException("failed")); try { p.Join(); throw new ApplicationException("WRONG!"); } catch (TargetInvocationException err) { Assert.AreEqual(err.InnerException.Message, "failed"); } catch { Assert.Fail("Got wrong excaption"); } } [TestMethod] public void MapTest() { var p = new Promise<int>(); var p2 = p.Then(x => x.ToString()); p.Resolve(100); Assert.AreEqual(p2.Join(), "100"); } [TestMethod] public void FixErrorTest() { var p = new Promise<int>(); var p2 = p.Then(x => x, e => 101); p.Reject(new Exception()); Assert.AreEqual(p2.Join(), 101); } [TestMethod] public void ChainTest() { var p1 = new Promise<int>(); var p3 = p1.Chain(x => { var p2 = new Promise<string>(); p2.Resolve(x.ToString()); return p2; }); p1.Resolve(100); Assert.AreEqual(p3.Join(), "100"); } [TestMethod] public void ChainFailTest() { var p1 = new Promise<int>(); var p3 = p1.Chain(x => { var p2 = new Promise<string>(); p2.Reject(new Exception("DIE!!!")); return p2; }); p1.Resolve(100); Assert.IsTrue(p3.IsResolved); } [TestMethod] public void PoolTest() { var pid = Thread.CurrentThread.ManagedThreadId; var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); Assert.AreNotEqual(pid, p.Join()); } [TestMethod] public void WorkerPoolSizeTest() { var pool = new WorkerPool(5, 10, 1); Assert.AreEqual(5, pool.PoolSize); pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); Assert.AreEqual(5, pool.PoolSize); for (int i = 0; i < 100; i++) pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); Thread.Sleep(200); Assert.AreEqual(10, pool.PoolSize); pool.Dispose(); } [TestMethod] public void WorkerPoolCorrectTest() { var pool = new WorkerPool(0,1000,100); const int iterations = 1000; int pending = iterations; var stop = new ManualResetEvent(false); var count = 0; for (int i = 0; i < iterations; i++) { pool .Invoke(() => 1) .Then(x => Interlocked.Add(ref count, x)) .Then(x => Math.Log10(x)) .On(() => { Interlocked.Decrement(ref pending); if (pending == 0) stop.Set(); }, PromiseEventType.All); } stop.WaitOne(); Assert.AreEqual(iterations, count); Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); pool.Dispose(); } [TestMethod] public void WorkerPoolDisposeTest() { var pool = new WorkerPool(5, 20); Assert.AreEqual(5, pool.PoolSize); pool.Dispose(); Thread.Sleep(500); Assert.AreEqual(0, pool.PoolSize); pool.Dispose(); } [TestMethod] public void MTQueueTest() { var queue = new MTQueue<int>(); int res; queue.Enqueue(10); Assert.IsTrue(queue.TryDequeue(out res)); Assert.AreEqual(10, res); Assert.IsFalse(queue.TryDequeue(out res)); for (int i = 0; i < 1000; i++) queue.Enqueue(i); for (int i = 0; i < 1000; i++) { queue.TryDequeue(out res); Assert.AreEqual(i, res); } int writers = 0; int readers = 0; var stop = new ManualResetEvent(false); int total = 0; const int itemsPerWriter = 10000; const int writersCount = 10; for (int i = 0; i < writersCount; i++) { Interlocked.Increment(ref writers); AsyncPool .RunThread(() => { for (int ii = 0; ii < itemsPerWriter; ii++) { queue.Enqueue(1); } return 1; }) .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); } for (int i = 0; i < 10; i++) { Interlocked.Increment(ref readers); AsyncPool .RunThread(() => { int t; do { while (queue.TryDequeue(out t)) Interlocked.Add(ref total, t); } while (writers > 0); return 1; }) .On(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); }, PromiseEventType.All); } stop.WaitOne(); Assert.AreEqual(100000, total); } [TestMethod] public void AsyncQueueTest() { var queue = new AsyncQueue<int>(); int res; queue.Enqueue(10); Assert.IsTrue(queue.TryDequeue(out res)); Assert.AreEqual(10, res); Assert.IsFalse(queue.TryDequeue(out res)); for (int i = 0; i < 1000; i++) queue.Enqueue(i); for (int i = 0; i < 1000; i++) { queue.TryDequeue(out res); Assert.AreEqual(i, res); } const int count = 10000000; int res1 = 0, res2 = 0; var t1 = Environment.TickCount; AsyncPool.RunThread( () => { for (var i = 0; i < count; i++) queue.Enqueue(1); Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); }, () => { for (var i = 0; i < count; i++) queue.Enqueue(2); Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); }, () => { int temp; int i = 0; while (i < count) if (queue.TryDequeue(out temp)) { i++; res1 += temp; } Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); }, () => { int temp; int i = 0; while (i < count) if (queue.TryDequeue(out temp)) { i++; res2 += temp; } Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) .Bundle() .Join(); Assert.AreEqual(count * 3, res1 + res2); Console.WriteLine( "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count ); } [TestMethod] public void AsyncQueueBatchTest() { var queue = new AsyncQueue<int>(); const int wBatch = 29; const int wCount = 400000; const int total = wBatch * wCount * 2; const int summ = wBatch * wCount * 3; int r1 = 0, r2 = 0; const int rBatch = 111; int read = 0; var t1 = Environment.TickCount; AsyncPool.RunThread( () => { var buffer = new int[wBatch]; for(int i = 0; i<wBatch; i++) buffer[i] = 1; for(int i =0; i < wCount; i++) queue.EnqueueRange(buffer,0,wBatch); Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[wBatch]; for(int i = 0; i<wBatch; i++) buffer[i] = 2; for(int i =0; i < wCount; i++) queue.EnqueueRange(buffer,0,wBatch); Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[rBatch]; while(read < total) { int actual; if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { for(int i=0; i< actual; i++) r1 += buffer[i]; Interlocked.Add(ref read, actual); } } Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[rBatch]; while(read < total) { int actual; if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { for(int i=0; i< actual; i++) r2 += buffer[i]; Interlocked.Add(ref read, actual); } } Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) .Bundle() .Join(); Assert.AreEqual(summ , r1 + r2); Console.WriteLine( "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, r1, r2, r1 + r2, total ); } [TestMethod] public void AsyncQueueChunkDequeueTest() { var queue = new AsyncQueue<int>(); const int wBatch = 31; const int wCount = 200000; const int total = wBatch * wCount * 3; const int summ = wBatch * wCount * 6; int r1 = 0, r2 = 0; const int rBatch = 1024; int read = 0; var t1 = Environment.TickCount; AsyncPool.RunThread( () => { var buffer = new int[wBatch]; for(int i = 0; i<wBatch; i++) buffer[i] = 1; for(int i =0; i < wCount; i++) queue.EnqueueRange(buffer,0,wBatch); Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[wBatch]; for(int i = 0; i<wBatch; i++) buffer[i] = 2; for(int i =0; i < wCount; i++) queue.EnqueueRange(buffer,0,wBatch); Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[wBatch]; for(int i = 0; i<wBatch; i++) buffer[i] = 3; for(int i =0; i < wCount; i++) queue.EnqueueRange(buffer,0,wBatch); Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[rBatch]; int count = 1; double avgchunk = 0; while(read < total) { int actual; if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { for(int i=0; i< actual; i++) r2 += buffer[i]; Interlocked.Add(ref read, actual); avgchunk = avgchunk*(count-1)/count + actual/(double)count; count ++; } } Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); } ) .Bundle() .Join(); Assert.AreEqual(summ , r1 + r2); Console.WriteLine( "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, r1, r2, r1 + r2, total ); } [TestMethod] public void AsyncQueueDrainTest() { var queue = new AsyncQueue<int>(); const int wBatch = 11; const int wCount = 200000; const int total = wBatch * wCount * 3; const int summ = wBatch * wCount * 3; int r1 = 0, r2 = 0; const int rBatch = 11; int read = 0; var t1 = Environment.TickCount; AsyncPool.RunThread( () => { var buffer = new int[wBatch]; for(int i = 0; i<wBatch; i++) buffer[i] = 1; for(int i =0; i < wCount; i++) queue.EnqueueRange(buffer,0,wBatch); Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); }, () => { for(int i =0; i < wCount * wBatch; i++) queue.Enqueue(1); Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); }, () => { var buffer = new int[wBatch]; for(int i = 0; i<wBatch; i++) buffer[i] = 1; for(int i =0; i < wCount; i++) queue.EnqueueRange(buffer,0,wBatch); Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); }, /*() => { int temp; int count = 0; while (read < total) if (queue.TryDequeue(out temp)) { count++; r1 += temp; Interlocked.Increment(ref read); } Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); },*/ /*() => { var buffer = new int[rBatch]; var count = 0; while(read < total) { int actual; if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { for(int i=0; i< actual; i++) r1 += buffer[i]; Interlocked.Add(ref read, actual); count += actual; } } Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); },*/ () => { var count = 0; while(read < total) { var buffer = queue.Drain(); for(int i=0; i< buffer.Length; i++) r1 += buffer[i]; Interlocked.Add(ref read, buffer.Length); count += buffer.Length; } Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); }, () => { var count = 0; while(read < total) { var buffer = queue.Drain(); for(int i=0; i< buffer.Length; i++) r2 += buffer[i]; Interlocked.Add(ref read, buffer.Length); count += buffer.Length; } Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); } ) .Bundle() .Join(); Assert.AreEqual(summ , r1 + r2); Console.WriteLine( "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, r1, r2, r1 + r2, total ); } [TestMethod] public void ParallelMapTest() { const int count = 100000; var args = new double[count]; var rand = new Random(); for (int i = 0; i < count; i++) args[i] = rand.NextDouble(); var t = Environment.TickCount; var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); t = Environment.TickCount; for (int i = 0; i < count; i++) Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); } [TestMethod] public void ChainedMapTest() { using (var pool = new WorkerPool()) { const int count = 10000; var args = new double[count]; var rand = new Random(); for (int i = 0; i < count; i++) args[i] = rand.NextDouble(); var t = Environment.TickCount; var res = args .ChainedMap( // Analysis disable once AccessToDisposedClosure x => pool.Invoke( () => Math.Sin(x * x) ), 4 ) .Join(); Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); t = Environment.TickCount; for (int i = 0; i < count; i++) Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); } } [TestMethod] public void ParallelForEachTest() { const int count = 100000; var args = new int[count]; var rand = new Random(); for (int i = 0; i < count; i++) args[i] = (int)(rand.NextDouble() * 100); int result = 0; var t = Environment.TickCount; args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); int result2 = 0; t = Environment.TickCount; for (int i = 0; i < count; i++) result2 += args[i]; Assert.AreEqual(result2, result); Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); } [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) var step1 = PromiseHelper .Sleep(200, "Alan") .On(() => flags[0] = true, PromiseEventType.Cancelled); var p = step1 .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) .Then(y => y) .On(() => flags[1] = true, PromiseEventType.Cancelled) ) .On(() => flags[2] = true, PromiseEventType.Cancelled); step1.Join(); p.Cancel(); try { Assert.AreEqual(p.Join(), "Hi, Alan"); Assert.Fail("Shouldn't get here"); } catch (OperationCanceledException) { } Assert.IsFalse(flags[0]); Assert.IsTrue(flags[1]); Assert.IsTrue(flags[2]); } [TestMethod] public void ChainedCancel1Test() { // при отмене сцепленной асинхронной операции все обещание должно // завершаться ошибкой OperationCanceledException var p = PromiseHelper .Sleep(1, "Hi, HAL!") .Then(x => { // запускаем две асинхронные операции var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); // вторая операция отменяет первую до завершения PromiseHelper .Sleep(100, "HAL, STOP!") .Then(result.Cancel); return result; }); try { p.Join(); } catch (TargetInvocationException err) { Assert.IsTrue(err.InnerException is OperationCanceledException); } } [TestMethod] public void ChainedCancel2Test() { // при отмене цепочки обещаний, вложенные операции также должны отменяться var pSurvive = new Promise<bool>(); var hemStarted = new Signal(); var p = PromiseHelper .Sleep(1, "Hi, HAL!") .Chain(() => { hemStarted.Set(); // запускаем две асинхронные операции var result = PromiseHelper .Sleep(2000, "HEM ENABLED!!!") .Then(() => pSurvive.Resolve(false)); result .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled); return result; }); hemStarted.Wait(); p.Cancel(); try { p.Join(); Assert.Fail(); } catch (OperationCanceledException) { } Assert.IsTrue(pSurvive.Join()); } [TestMethod] public void SharedLockTest() { var l = new SharedLock(); int shared = 0; int exclusive = 0; var s1 = new Signal(); var log = new AsyncQueue<string>(); try { AsyncPool.RunThread( () => { log.Enqueue("Reader #1 started"); try { l.LockShared(); log.Enqueue("Reader #1 lock got"); if (Interlocked.Increment(ref shared) == 2) s1.Set(); s1.Wait(); log.Enqueue("Reader #1 finished"); Interlocked.Decrement(ref shared); } finally { l.Release(); log.Enqueue("Reader #1 lock released"); } }, () => { log.Enqueue("Reader #2 started"); try { l.LockShared(); log.Enqueue("Reader #2 lock got"); if (Interlocked.Increment(ref shared) == 2) s1.Set(); s1.Wait(); log.Enqueue("Reader #2 upgrading to writer"); Interlocked.Decrement(ref shared); l.Upgrade(); log.Enqueue("Reader #2 upgraded"); Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); Assert.AreEqual(0, shared); log.Enqueue("Reader #2 finished"); Interlocked.Decrement(ref exclusive); } finally { l.Release(); log.Enqueue("Reader #2 lock released"); } }, () => { log.Enqueue("Writer #1 started"); try { l.LockExclusive(); log.Enqueue("Writer #1 got the lock"); Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); Interlocked.Decrement(ref exclusive); log.Enqueue("Writer #1 is finished"); } finally { l.Release(); log.Enqueue("Writer #1 lock released"); } } ).Bundle().Join(1000); log.Enqueue("Done"); } catch(Exception error) { log.Enqueue(error.Message); throw; } finally { foreach (var m in log) Console.WriteLine(m); } } } }