diff Implab.Test/AsyncTests.cs @ 14:e943453e5039 promises

Implemented interllocked queue fixed promise syncronization
author cin
date Wed, 06 Nov 2013 17:49:12 +0400
parents b0feb5b9ad1c
children 0f982f9b7d4d
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Wed Nov 06 01:07:55 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Wed Nov 06 17:49:12 2013 +0400
@@ -137,6 +137,77 @@
         }
 
         [TestMethod]
+        public void MTQueueTest() {
+            var queue = new MTQueue<int>();
+            var pool = new WorkerPool(5, 20);
+
+            int res;
+
+            queue.Enqueue(10);
+            Assert.IsTrue(queue.TryDequeue(out res));
+            Assert.AreEqual(10, res);
+            Assert.IsFalse(queue.TryDequeue(out res));
+
+            for (int i = 0; i < 1000; i++)
+                queue.Enqueue(i);
+
+            for (int i = 0; i < 1000; i++) {
+                queue.TryDequeue(out res);
+                Assert.AreEqual(i, res);
+            }
+
+            int writers = 0;
+            int readers = 0;
+            var stop = new ManualResetEvent(false);
+            int total = 0;
+
+            int itemsPerWriter = 1000;
+            int writersCount = 3;
+
+            for (int i = 0; i < writersCount; i++) {
+                Interlocked.Increment(ref writers);
+                var wn = i;
+                AsyncPool
+                    .InvokeNewThread(() => {
+                        Console.WriteLine("Started writer: {0}", wn);
+                        for (int ii = 0; ii < itemsPerWriter; ii++) {
+                            queue.Enqueue(1);
+                            Thread.Sleep(1);
+                        }
+                        Console.WriteLine("Stopped writer: {0}", wn);
+                        return 1;
+                    })
+                    .Then(x => Interlocked.Decrement(ref writers) );
+            }
+            
+            for (int i = 0; i < 10; i++) {
+                Interlocked.Increment(ref readers);
+                var wn = i;
+                AsyncPool
+                    .InvokeNewThread(() => {
+                        int t;
+                        Console.WriteLine("Started reader: {0}", wn);
+                        do {
+                            while (queue.TryDequeue(out t))
+                                Interlocked.Add(ref total, t);
+                            Thread.Sleep(0);
+                        } while (writers > 0);
+                        Console.WriteLine("Stopped reader: {0}", wn);
+                        return 1;
+                    })
+                    .Then(x => {
+                        Interlocked.Decrement(ref readers);
+                        if (readers == 0)
+                            stop.Set();
+                    });
+            }
+
+            stop.WaitOne();
+
+            Assert.AreEqual(itemsPerWriter * writersCount, total);
+        }
+
+        [TestMethod]
         public void ComplexCase1Test() {
             var flags = new bool[3];