diff Implab.Test/AsyncTests.cs @ 124:a336cb13c6a9 v2

major update, added Drain mathod to AsyncQueue class
author cin
date Thu, 15 Jan 2015 02:43:14 +0300
parents 0c8685c8b56b
children f803565868a4
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/Implab.Test/AsyncTests.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -249,7 +249,7 @@
             for (int i = 0; i < writersCount; i++) {
                 Interlocked.Increment(ref writers);
                 AsyncPool
-                    .InvokeNewThread(() => {
+                    .RunThread(() => {
                         for (int ii = 0; ii < itemsPerWriter; ii++) {
                             queue.Enqueue(1);
                         }
@@ -261,7 +261,7 @@
             for (int i = 0; i < 10; i++) {
                 Interlocked.Increment(ref readers);
                 AsyncPool
-                    .InvokeNewThread(() => {
+                    .RunThread(() => {
                         int t;
                         do {
                             while (queue.TryDequeue(out t))
@@ -336,7 +336,7 @@
                     Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                 }
             )
-                .Combine()
+                .Bundle()
                 .Join();
 
             Assert.AreEqual(count * 3, res1 + res2);
@@ -414,7 +414,7 @@
                     Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                 }
             )
-                .Combine()
+                .Bundle()
                 .Join();
 
             Assert.AreEqual(summ , r1 + r2);
@@ -490,7 +490,110 @@
                     Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
                 }
             )
-                .Combine()
+                .Bundle()
+                .Join();
+
+            Assert.AreEqual(summ , r1 + r2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2,
+                total
+            );
+        }
+
+        [TestMethod]
+        public void AsyncQueueDrainTest() {
+            var queue = new AsyncQueue<int>();
+
+            const int wBatch = 11;
+            const int wCount = 200000;
+            const int total = wBatch * wCount * 3;
+            const int summ = wBatch * wCount * 3;
+
+            int r1 = 0, r2 = 0;
+            const int rBatch = 11;
+            int read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    for(int i =0; i < wCount * wBatch; i++)
+                        queue.Enqueue(1);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
+                },
+                /*() => {
+                    int temp;
+                    int count = 0;
+                    while (read < total)
+                        if (queue.TryDequeue(out temp)) {
+                            count++;
+                            r1 += temp;
+                            Interlocked.Increment(ref read);
+                        }
+                    Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
+                },*/
+                /*() => {
+                    var buffer = new int[rBatch];
+                    var count = 0;
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r1 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                            count += actual;
+                        }
+                    }
+
+                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+                },*/
+                () => {
+                    var count = 0;
+                    while(read < total) {
+                        var buffer = queue.Drain();
+                        for(int i=0; i< buffer.Length; i++)
+                            r1 += buffer[i];
+                            Interlocked.Add(ref read, buffer.Length);
+                        count += buffer.Length;
+                    }
+                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+                },
+                () => {
+                    var count = 0;
+                    while(read < total) {
+                        var buffer = queue.Drain();
+                        for(int i=0; i< buffer.Length; i++)
+                            r2 += buffer[i];
+                        Interlocked.Add(ref read, buffer.Length);
+                        count += buffer.Length;
+                    }
+                    Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
+                }
+            )
+                .Bundle()
                 .Join();
 
             Assert.AreEqual(summ , r1 + r2);