changeset 14:e943453e5039 promises

Implemented interllocked queue fixed promise syncronization
author cin
date Wed, 06 Nov 2013 17:49:12 +0400
parents b0feb5b9ad1c
children 0f982f9b7d4d
files Implab.Test/AsyncTests.cs Implab.suo Implab/Implab.csproj Implab/Parallels/AsyncPool.cs Implab/Parallels/MTQueue.cs Implab/Promise.cs
diffstat 6 files changed, 167 insertions(+), 4 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Wed Nov 06 01:07:55 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Wed Nov 06 17:49:12 2013 +0400
@@ -137,6 +137,77 @@
         }
 
         [TestMethod]
+        public void MTQueueTest() {
+            var queue = new MTQueue<int>();
+            var pool = new WorkerPool(5, 20);
+
+            int res;
+
+            queue.Enqueue(10);
+            Assert.IsTrue(queue.TryDequeue(out res));
+            Assert.AreEqual(10, res);
+            Assert.IsFalse(queue.TryDequeue(out res));
+
+            for (int i = 0; i < 1000; i++)
+                queue.Enqueue(i);
+
+            for (int i = 0; i < 1000; i++) {
+                queue.TryDequeue(out res);
+                Assert.AreEqual(i, res);
+            }
+
+            int writers = 0;
+            int readers = 0;
+            var stop = new ManualResetEvent(false);
+            int total = 0;
+
+            int itemsPerWriter = 1000;
+            int writersCount = 3;
+
+            for (int i = 0; i < writersCount; i++) {
+                Interlocked.Increment(ref writers);
+                var wn = i;
+                AsyncPool
+                    .InvokeNewThread(() => {
+                        Console.WriteLine("Started writer: {0}", wn);
+                        for (int ii = 0; ii < itemsPerWriter; ii++) {
+                            queue.Enqueue(1);
+                            Thread.Sleep(1);
+                        }
+                        Console.WriteLine("Stopped writer: {0}", wn);
+                        return 1;
+                    })
+                    .Then(x => Interlocked.Decrement(ref writers) );
+            }
+            
+            for (int i = 0; i < 10; i++) {
+                Interlocked.Increment(ref readers);
+                var wn = i;
+                AsyncPool
+                    .InvokeNewThread(() => {
+                        int t;
+                        Console.WriteLine("Started reader: {0}", wn);
+                        do {
+                            while (queue.TryDequeue(out t))
+                                Interlocked.Add(ref total, t);
+                            Thread.Sleep(0);
+                        } while (writers > 0);
+                        Console.WriteLine("Stopped reader: {0}", wn);
+                        return 1;
+                    })
+                    .Then(x => {
+                        Interlocked.Decrement(ref readers);
+                        if (readers == 0)
+                            stop.Set();
+                    });
+            }
+
+            stop.WaitOne();
+
+            Assert.AreEqual(itemsPerWriter * writersCount, total);
+        }
+
+        [TestMethod]
         public void ComplexCase1Test() {
             var flags = new bool[3];
 
Binary file Implab.suo has changed
--- a/Implab/Implab.csproj	Wed Nov 06 01:07:55 2013 +0400
+++ b/Implab/Implab.csproj	Wed Nov 06 17:49:12 2013 +0400
@@ -38,6 +38,7 @@
     <Compile Include="IPromise.cs" />
     <Compile Include="ITaskController.cs" />
     <Compile Include="ManagedPromise.cs" />
+    <Compile Include="Parallels\MTQueue.cs" />
     <Compile Include="Parallels\WorkerPool.cs" />
     <Compile Include="PromiseState.cs" />
     <Compile Include="TaskController.cs" />
--- a/Implab/Parallels/AsyncPool.cs	Wed Nov 06 01:07:55 2013 +0400
+++ b/Implab/Parallels/AsyncPool.cs	Wed Nov 06 17:49:12 2013 +0400
@@ -16,13 +16,30 @@
 
 			ThreadPool.QueueUserWorkItem(param => {
 				try {
-					p.Resolve(func());
+					p.Resolve(func());
 				} catch(Exception e) {
 					p.Reject(e);
 				}
 			});
 
 			return p;
-		}
+		}
+
+        public static Promise<T> InvokeNewThread<T>(Func<T> func) {
+            var p = new Promise<T>();
+
+            var worker = new Thread(() => {
+                try {
+                    p.Resolve(func());
+                } catch (Exception e) {
+                    p.Reject(e);
+                }
+            });
+            worker.IsBackground = true;
+
+            worker.Start();
+
+            return p;
+        }
 	}
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/MTQueue.cs	Wed Nov 06 17:49:12 2013 +0400
@@ -0,0 +1,74 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Parallels {
+    public class MTQueue<T> {
+        class Node {
+            public Node(T value) {
+                this.value = value;
+            }
+            public readonly T value;
+            public Node next;
+        }
+
+        Node m_first;
+        Node m_last;
+
+        public void Enqueue(T value) {
+            var last = m_last;
+            var next = new Node(value);
+
+            while (last != Interlocked.CompareExchange(ref m_last, next, last))
+                last = m_last;
+
+            if (last != null)
+                last.next = next;
+            else
+                m_first = next;
+        }
+
+        public bool TryDequeue(out T value) {
+            Node first;
+            Node next = null;
+            value = default(T);
+
+            do {
+                first = m_first;
+                if (first == null)
+                    return false;
+                next = first.next;
+                if (next == null) {
+                    // this is the last element,
+                    // then try to update tail
+                    if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
+                        // this is inconsistent situation which means that the queue is empty
+                        if (m_last == null)
+                            return false;
+                        // tail has been changed, that means that we need to restart
+                        continue; 
+                    }
+
+                    // tail succesfully updated and first.next will never be changed
+                    // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
+                    // but the writer may update the m_first since the m_last is null
+
+                    // so we need to fix inconsistency by setting m_first to null, but if it already has been
+                    // updated by a writer then we should just give up
+                    Interlocked.CompareExchange(ref m_first, null, first);
+                    break;
+
+                } else {
+                        if (first == Interlocked.CompareExchange(ref m_first, next, first))
+                            // head succesfully updated
+                            break;
+                }
+            } while (true);
+
+            value = first.value;
+            return true;
+        }
+    }
+}
--- a/Implab/Promise.cs	Wed Nov 06 01:07:55 2013 +0400
+++ b/Implab/Promise.cs	Wed Nov 06 17:49:12 2013 +0400
@@ -88,7 +88,7 @@
         /// <param name="result">Результат выполнения.</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Resolve(T result) {
-            lock (this) {
+            lock (m_lock) {
                 if (m_state == PromiseState.Cancelled)
                     return;
                 if (m_state != PromiseState.Unresolved)
@@ -106,7 +106,7 @@
         /// <param name="error">Исключение возникшее при выполнении операции</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Reject(Exception error) {
-            lock (this) {
+            lock (m_lock) {
                 if (m_state == PromiseState.Cancelled)
                     return;
                 if (m_state != PromiseState.Unresolved)