Mercurial > pub > ImplabNet
diff Implab.Test/AsyncTests.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | d4e38929ce36 |
children | 62d2f1e98c4e |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab.Test/AsyncTests.cs Sun Jan 11 19:13:02 2015 +0300 @@ -72,7 +72,7 @@ p.Cancel(); var p2 = p - .Cancelled(() => { + .Cancelled<bool>(() => { throw new ApplicationException("CANCELLED"); }) .Error(e => true); @@ -195,11 +195,11 @@ .Invoke(() => 1) .Then(x => Interlocked.Add(ref count, x)) .Then(x => Math.Log10(x)) - .Anyway(() => { + .On(() => { Interlocked.Decrement(ref pending); if (pending == 0) stop.Set(); - }); + }, PromiseEventType.All); } stop.WaitOne(); @@ -255,7 +255,7 @@ } return 1; }) - .Anyway(() => Interlocked.Decrement(ref writers)); + .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); } for (int i = 0; i < 10; i++) { @@ -269,11 +269,72 @@ } while (writers > 0); return 1; }) - .Anyway(() => { + .On(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); - }); + }, PromiseEventType.All); + } + + stop.WaitOne(); + + Assert.AreEqual(100000, total); + } + + [TestMethod] + public void AsyncQueueTest() { + var queue = new AsyncQueue<int>(); + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + const int itemsPerWriter = 10000; + const int writersCount = 10; + + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + AsyncPool + .InvokeNewThread(() => { + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + } + return 1; + }) + .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + AsyncPool + .InvokeNewThread(() => { + int t; + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + } while (writers > 0); + return 1; + }) + .On(() => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }, PromiseEventType.All); } stop.WaitOne(); @@ -371,15 +432,15 @@ var step1 = PromiseHelper .Sleep(200, "Alan") - .Cancelled(() => flags[0] = true); + .On(() => flags[0] = true, PromiseEventType.Cancelled); var p = step1 .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) .Then(y => y) - .Cancelled(() => flags[1] = true) + .On(() => flags[1] = true, PromiseEventType.Cancelled) ) - .Cancelled(() => flags[2] = true); + .On(() => flags[2] = true, PromiseEventType.Cancelled); step1.Join(); p.Cancel(); try {