Mercurial > pub > ImplabNet
changeset 14:e943453e5039 promises
Implemented interllocked queue
fixed promise syncronization
| author | cin | 
|---|---|
| date | Wed, 06 Nov 2013 17:49:12 +0400 | 
| parents | b0feb5b9ad1c | 
| children | 0f982f9b7d4d | 
| files | Implab.Test/AsyncTests.cs Implab.suo Implab/Implab.csproj Implab/Parallels/AsyncPool.cs Implab/Parallels/MTQueue.cs Implab/Promise.cs | 
| diffstat | 6 files changed, 167 insertions(+), 4 deletions(-) [+] | 
line wrap: on
 line diff
--- a/Implab.Test/AsyncTests.cs Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Wed Nov 06 17:49:12 2013 +0400 @@ -137,6 +137,77 @@ } [TestMethod] + public void MTQueueTest() { + var queue = new MTQueue<int>(); + var pool = new WorkerPool(5, 20); + + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + int itemsPerWriter = 1000; + int writersCount = 3; + + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + Console.WriteLine("Started writer: {0}", wn); + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + Thread.Sleep(1); + } + Console.WriteLine("Stopped writer: {0}", wn); + return 1; + }) + .Then(x => Interlocked.Decrement(ref writers) ); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + int t; + Console.WriteLine("Started reader: {0}", wn); + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + Thread.Sleep(0); + } while (writers > 0); + Console.WriteLine("Stopped reader: {0}", wn); + return 1; + }) + .Then(x => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }); + } + + stop.WaitOne(); + + Assert.AreEqual(itemsPerWriter * writersCount, total); + } + + [TestMethod] public void ComplexCase1Test() { var flags = new bool[3];
--- a/Implab/Implab.csproj Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab/Implab.csproj Wed Nov 06 17:49:12 2013 +0400 @@ -38,6 +38,7 @@ <Compile Include="IPromise.cs" /> <Compile Include="ITaskController.cs" /> <Compile Include="ManagedPromise.cs" /> + <Compile Include="Parallels\MTQueue.cs" /> <Compile Include="Parallels\WorkerPool.cs" /> <Compile Include="PromiseState.cs" /> <Compile Include="TaskController.cs" />
--- a/Implab/Parallels/AsyncPool.cs Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab/Parallels/AsyncPool.cs Wed Nov 06 17:49:12 2013 +0400 @@ -16,13 +16,30 @@ ThreadPool.QueueUserWorkItem(param => { try { - p.Resolve(func()); + p.Resolve(func()); } catch(Exception e) { p.Reject(e); } }); return p; - } + } + + public static Promise<T> InvokeNewThread<T>(Func<T> func) { + var p = new Promise<T>(); + + var worker = new Thread(() => { + try { + p.Resolve(func()); + } catch (Exception e) { + p.Reject(e); + } + }); + worker.IsBackground = true; + + worker.Start(); + + return p; + } } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/MTQueue.cs Wed Nov 06 17:49:12 2013 +0400 @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public class MTQueue<T> { + class Node { + public Node(T value) { + this.value = value; + } + public readonly T value; + public Node next; + } + + Node m_first; + Node m_last; + + public void Enqueue(T value) { + var last = m_last; + var next = new Node(value); + + while (last != Interlocked.CompareExchange(ref m_last, next, last)) + last = m_last; + + if (last != null) + last.next = next; + else + m_first = next; + } + + public bool TryDequeue(out T value) { + Node first; + Node next = null; + value = default(T); + + do { + first = m_first; + if (first == null) + return false; + next = first.next; + if (next == null) { + // this is the last element, + // then try to update tail + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // this is inconsistent situation which means that the queue is empty + if (m_last == null) + return false; + // tail has been changed, that means that we need to restart + continue; + } + + // tail succesfully updated and first.next will never be changed + // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null + // but the writer may update the m_first since the m_last is null + + // so we need to fix inconsistency by setting m_first to null, but if it already has been + // updated by a writer then we should just give up + Interlocked.CompareExchange(ref m_first, null, first); + break; + + } else { + if (first == Interlocked.CompareExchange(ref m_first, next, first)) + // head succesfully updated + break; + } + } while (true); + + value = first.value; + return true; + } + } +}
--- a/Implab/Promise.cs Wed Nov 06 01:07:55 2013 +0400 +++ b/Implab/Promise.cs Wed Nov 06 17:49:12 2013 +0400 @@ -88,7 +88,7 @@ /// <param name="result">Результат выполнения.</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> public void Resolve(T result) { - lock (this) { + lock (m_lock) { if (m_state == PromiseState.Cancelled) return; if (m_state != PromiseState.Unresolved) @@ -106,7 +106,7 @@ /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> public void Reject(Exception error) { - lock (this) { + lock (m_lock) { if (m_state == PromiseState.Cancelled) return; if (m_state != PromiseState.Unresolved)
