Mercurial > pub > ImplabNet
diff Implab.Test/AsyncTests.cs @ 121:62d2f1e98c4e v2
working version of AsyncQueue and batch operations
tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 18:19:41 +0300 |
parents | 2573b562e328 |
children | 0c8685c8b56b |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Mon Jan 12 05:19:52 2015 +0300 +++ b/Implab.Test/AsyncTests.cs Mon Jan 12 18:19:41 2015 +0300 @@ -299,47 +299,134 @@ Assert.AreEqual(i, res); } - int writers = 0; - int readers = 0; - var stop = new ManualResetEvent(false); - int total = 0; + const int count = 10000000; - const int itemsPerWriter = 10000; - const int writersCount = 10; + int res1 = 0, res2 = 0; + var t1 = Environment.TickCount; - for (int i = 0; i < writersCount; i++) { - Interlocked.Increment(ref writers); - AsyncPool - .InvokeNewThread(() => { - for (int ii = 0; ii < itemsPerWriter; ii++) { - queue.Enqueue(1); + AsyncPool.RunThread( + () => { + for (var i = 0; i < count; i++) + queue.Enqueue(1); + Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); + }, + () => { + for (var i = 0; i < count; i++) + queue.Enqueue(2); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); + }, + () => { + int temp; + int i = 0; + while (i < count) + if (queue.TryDequeue(out temp)) { + i++; + res1 += temp; } - return 1; - }) - .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); - } + Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); + }, + () => { + int temp; + int i = 0; + while (i < count) + if (queue.TryDequeue(out temp)) { + i++; + res2 += temp; + } + Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); + } + ) + .Combine() + .Join(); + + Assert.AreEqual(count * 3, res1 + res2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + res1, + res2, + res1 + res2, + count + ); + } + + [TestMethod] + public void AsyncQueueBatchTest() { + var queue = new AsyncQueue<int>(); + + const int wBatch = 29; + const int wCount = 400000; + const int total = wBatch * wCount * 2; + const int summ = wBatch * wCount * 3; - for (int i = 0; i < 10; i++) { - Interlocked.Increment(ref readers); - AsyncPool - .InvokeNewThread(() => { - int t; - do { - while (queue.TryDequeue(out t)) - Interlocked.Add(ref total, t); - } while (writers > 0); - return 1; - }) - .On(() => { - Interlocked.Decrement(ref readers); - if (readers == 0) - stop.Set(); - }, PromiseEventType.All); - } + int r1 = 0, r2 = 0; + const int rBatch = 111; + int read = 0; + + var t1 = Environment.TickCount; + + AsyncPool.RunThread( + () => { + var buffer = new int[wBatch]; + for(int i = 0; i<wBatch; i++) + buffer[i] = 1; + + for(int i =0; i < wCount; i++) + queue.EnqueueRange(buffer,0,wBatch); + Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[wBatch]; + for(int i = 0; i<wBatch; i++) + buffer[i] = 2; + + for(int i =0; i < wCount; i++) + queue.EnqueueRange(buffer,0,wBatch); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[rBatch]; - stop.WaitOne(); + while(read < total) { + int actual; + if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r1 += buffer[i]; + Interlocked.Add(ref read, actual); + } + } + + Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[rBatch]; - Assert.AreEqual(itemsPerWriter * writersCount, total); + while(read < total) { + int actual; + if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r2 += buffer[i]; + Interlocked.Add(ref read, actual); + } + } + + Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); + } + ) + .Combine() + .Join(); + + Assert.AreEqual(summ , r1 + r2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + r1, + r2, + r1 + r2, + total + ); } [TestMethod]