# HG changeset patch # User cin # Date 1383745752 -14400 # Node ID e943453e5039617dcc330948d1aa9a17b3ce4c70 # Parent b0feb5b9ad1c91f52916ac22904a2720d2e24eb6 Implemented interllocked queue fixed promise syncronization diff -r b0feb5b9ad1c -r e943453e5039 Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Wed Nov 06 17:49:12 2013 +0400 @@ -137,6 +137,77 @@ } [TestMethod] + public void MTQueueTest() { + var queue = new MTQueue(); + var pool = new WorkerPool(5, 20); + + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + int itemsPerWriter = 1000; + int writersCount = 3; + + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + Console.WriteLine("Started writer: {0}", wn); + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + Thread.Sleep(1); + } + Console.WriteLine("Stopped writer: {0}", wn); + return 1; + }) + .Then(x => Interlocked.Decrement(ref writers) ); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + int t; + Console.WriteLine("Started reader: {0}", wn); + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + Thread.Sleep(0); + } while (writers > 0); + Console.WriteLine("Stopped reader: {0}", wn); + return 1; + }) + .Then(x => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }); + } + + stop.WaitOne(); + + Assert.AreEqual(itemsPerWriter * writersCount, total); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3]; diff -r b0feb5b9ad1c -r e943453e5039 Implab.suo Binary file Implab.suo has changed diff -r b0feb5b9ad1c -r e943453e5039 Implab/Implab.csproj --- a/Implab/Implab.csproj Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab/Implab.csproj Wed Nov 06 17:49:12 2013 +0400 @@ -38,6 +38,7 @@ + diff -r b0feb5b9ad1c -r e943453e5039 Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab/Parallels/AsyncPool.cs Wed Nov 06 17:49:12 2013 +0400 @@ -16,13 +16,30 @@ ThreadPool.QueueUserWorkItem(param => { try { - p.Resolve(func()); + p.Resolve(func()); } catch(Exception e) { p.Reject(e); } }); return p; - } + } + + public static Promise InvokeNewThread(Func func) { + var p = new Promise(); + + var worker = new Thread(() => { + try { + p.Resolve(func()); + } catch (Exception e) { + p.Reject(e); + } + }); + worker.IsBackground = true; + + worker.Start(); + + return p; + } } } diff -r b0feb5b9ad1c -r e943453e5039 Implab/Parallels/MTQueue.cs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/MTQueue.cs Wed Nov 06 17:49:12 2013 +0400 @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public class MTQueue { + class Node { + public Node(T value) { + this.value = value; + } + public readonly T value; + public Node next; + } + + Node m_first; + Node m_last; + + public void Enqueue(T value) { + var last = m_last; + var next = new Node(value); + + while (last != Interlocked.CompareExchange(ref m_last, next, last)) + last = m_last; + + if (last != null) + last.next = next; + else + m_first = next; + } + + public bool TryDequeue(out T value) { + Node first; + Node next = null; + value = default(T); + + do { + first = m_first; + if (first == null) + return false; + next = first.next; + if (next == null) { + // this is the last element, + // then try to update tail + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // this is inconsistent situation which means that the queue is empty + if (m_last == null) + return false; + // tail has been changed, that means that we need to restart + continue; + } + + // tail succesfully updated and first.next will never be changed + // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null + // but the writer may update the m_first since the m_last is null + + // so we need to fix inconsistency by setting m_first to null, but if it already has been + // updated by a writer then we should just give up + Interlocked.CompareExchange(ref m_first, null, first); + break; + + } else { + if (first == Interlocked.CompareExchange(ref m_first, next, first)) + // head succesfully updated + break; + } + } while (true); + + value = first.value; + return true; + } + } +} diff -r b0feb5b9ad1c -r e943453e5039 Implab/Promise.cs --- a/Implab/Promise.cs Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab/Promise.cs Wed Nov 06 17:49:12 2013 +0400 @@ -88,7 +88,7 @@ /// Результат выполнения. /// Данное обещание уже выполнено public void Resolve(T result) { - lock (this) { + lock (m_lock) { if (m_state == PromiseState.Cancelled) return; if (m_state != PromiseState.Unresolved) @@ -106,7 +106,7 @@ /// Исключение возникшее при выполнении операции /// Данное обещание уже выполнено public void Reject(Exception error) { - lock (this) { + lock (m_lock) { if (m_state == PromiseState.Cancelled) return; if (m_state != PromiseState.Unresolved)