Mercurial > pub > ImplabNet
changeset 124:a336cb13c6a9 v2
major update, added Drain mathod to AsyncQueue class
author | cin |
---|---|
date | Thu, 15 Jan 2015 02:43:14 +0300 (2015-01-14) |
parents | f4d6ea6969cc |
children | f803565868a4 |
files | Implab.Test/AsyncTests.cs Implab/Parallels/ArrayTraits.cs Implab/Parallels/AsyncPool.cs Implab/Parallels/AsyncQueue.cs Implab/PromiseExtensions.cs MonoPlay/Program.cs |
diffstat | 6 files changed, 247 insertions(+), 42 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Tue Jan 13 01:42:38 2015 +0300 +++ b/Implab.Test/AsyncTests.cs Thu Jan 15 02:43:14 2015 +0300 @@ -249,7 +249,7 @@ for (int i = 0; i < writersCount; i++) { Interlocked.Increment(ref writers); AsyncPool - .InvokeNewThread(() => { + .RunThread(() => { for (int ii = 0; ii < itemsPerWriter; ii++) { queue.Enqueue(1); } @@ -261,7 +261,7 @@ for (int i = 0; i < 10; i++) { Interlocked.Increment(ref readers); AsyncPool - .InvokeNewThread(() => { + .RunThread(() => { int t; do { while (queue.TryDequeue(out t)) @@ -336,7 +336,7 @@ Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) - .Combine() + .Bundle() .Join(); Assert.AreEqual(count * 3, res1 + res2); @@ -414,7 +414,7 @@ Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) - .Combine() + .Bundle() .Join(); Assert.AreEqual(summ , r1 + r2); @@ -490,7 +490,110 @@ Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); } ) - .Combine() + .Bundle() + .Join(); + + Assert.AreEqual(summ , r1 + r2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + r1, + r2, + r1 + r2, + total + ); + } + + [TestMethod] + public void AsyncQueueDrainTest() { + var queue = new AsyncQueue<int>(); + + const int wBatch = 11; + const int wCount = 200000; + const int total = wBatch * wCount * 3; + const int summ = wBatch * wCount * 3; + + int r1 = 0, r2 = 0; + const int rBatch = 11; + int read = 0; + + var t1 = Environment.TickCount; + + AsyncPool.RunThread( + () => { + var buffer = new int[wBatch]; + for(int i = 0; i<wBatch; i++) + buffer[i] = 1; + + for(int i =0; i < wCount; i++) + queue.EnqueueRange(buffer,0,wBatch); + Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); + }, + () => { + for(int i =0; i < wCount * wBatch; i++) + queue.Enqueue(1); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[wBatch]; + for(int i = 0; i<wBatch; i++) + buffer[i] = 1; + + for(int i =0; i < wCount; i++) + queue.EnqueueRange(buffer,0,wBatch); + Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); + }, + /*() => { + int temp; + int count = 0; + while (read < total) + if (queue.TryDequeue(out temp)) { + count++; + r1 += temp; + Interlocked.Increment(ref read); + } + Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); + },*/ + /*() => { + var buffer = new int[rBatch]; + var count = 0; + while(read < total) { + int actual; + if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r1 += buffer[i]; + Interlocked.Add(ref read, actual); + count += actual; + } + } + + Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); + },*/ + () => { + var count = 0; + while(read < total) { + var buffer = queue.Drain(); + for(int i=0; i< buffer.Length; i++) + r1 += buffer[i]; + Interlocked.Add(ref read, buffer.Length); + count += buffer.Length; + } + Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); + }, + () => { + var count = 0; + while(read < total) { + var buffer = queue.Drain(); + for(int i=0; i< buffer.Length; i++) + r2 += buffer[i]; + Interlocked.Add(ref read, buffer.Length); + count += buffer.Length; + } + Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); + } + ) + .Bundle() .Join(); Assert.AreEqual(summ , r1 + r2);
--- a/Implab/Parallels/ArrayTraits.cs Tue Jan 13 01:42:38 2015 +0300 +++ b/Implab/Parallels/ArrayTraits.cs Thu Jan 15 02:43:14 2015 +0300 @@ -162,7 +162,7 @@ int slots = threads; // Analysis disable AccessToDisposedClosure - AsyncPool.InvokeNewThread<int>(() => { + AsyncPool.RunThread<int>(() => { for (int i = 0; i < source.Length; i++) { if(promise.IsResolved) break; // stop processing in case of error or cancellation
--- a/Implab/Parallels/AsyncPool.cs Tue Jan 13 01:42:38 2015 +0300 +++ b/Implab/Parallels/AsyncPool.cs Thu Jan 15 02:43:14 2015 +0300 @@ -31,7 +31,7 @@ return p; } - public static IPromise<T> InvokeNewThread<T>(Func<T> func) { + public static IPromise<T> RunThread<T>(Func<T> func) { var p = new Promise<T>(); var caller = TraceContext.Instance.CurrentOperation; @@ -53,7 +53,7 @@ } - public static IPromise InvokeNewThread(Action func) { + public static IPromise RunThread(Action func) { var p = new Promise(); var caller = TraceContext.Instance.CurrentOperation; @@ -76,11 +76,11 @@ } public static IPromise[] RunThread(params Action[] func) { - return func.Select(f => InvokeNewThread(f)).ToArray(); + return func.Select(f => RunThread(f)).ToArray(); } public static IPromise<T>[] RunThread<T>(params Func<T>[] func) { - return func.Select(f => InvokeNewThread(f)).ToArray(); + return func.Select(f => RunThread(f)).ToArray(); } } }
--- a/Implab/Parallels/AsyncQueue.cs Tue Jan 13 01:42:38 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Thu Jan 15 02:43:14 2015 +0300 @@ -2,6 +2,7 @@ using System.Collections.Generic; using System; using System.Collections; +using System.Diagnostics; namespace Implab.Parallels { public class AsyncQueue<T> : IEnumerable<T> { @@ -60,6 +61,16 @@ return true; } + /// <summary> + /// Prevents from allocating new space in the chunk and waits for all write operations to complete + /// </summary> + public void Commit() { + var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); + + while (m_hi != actual) + Thread.MemoryBarrier(); + } + public bool TryDequeue(out T value, out bool recycle) { int low; do { @@ -359,76 +370,114 @@ if (last != null) last.next = chunk; - else + else { m_first = chunk; + } return true; } void RecycleFirstChunk(Chunk first) { var next = first.next; + if (first != Interlocked.CompareExchange(ref m_first, next, first)) + return; + if (next == null) { - // looks like this is the last chunk + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + /*while (first.next == null) + Thread.MemoryBarrier();*/ + // race - // maybe someone already recycled this chunk - // or a new chunk has been appedned to the queue - - return; // give up + // someone already updated the tail, restore the pointer to the queue head + m_first = first; } // the tail is updated } // we need to update the head - Interlocked.CompareExchange(ref m_first, next, first); + //Interlocked.CompareExchange(ref m_first, next, first); // if the head is already updated then give up - return; + //return; } public void Clear() { // start the new queue - var t = new Chunk(m_chunkSize); - Thread.MemoryBarrier(); - m_last = t; - Thread.MemoryBarrier(); + var chunk = new Chunk(m_chunkSize); + + do { + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) // nothing to clear + return; - // make the new queue available to the readers, and stop the old one - m_first = t; - Thread.MemoryBarrier(); + if (first == null || (first.next == null && first != last)) // inconcistency + continue; + + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + + m_last = chunk; + + return; + + } while(true); } public T[] Drain() { // start the new queue - var t = new Chunk(m_chunkSize); - Thread.MemoryBarrier(); - m_last = t; - Thread.MemoryBarrier(); - - // make the new queue available to the readers, and stop the old one - Chunk first; + var chunk = new Chunk(m_chunkSize); do { - first = m_first; - } while(first != Interlocked.CompareExchange(ref m_first - Thread.MemoryBarrier(); + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) + return new T[0]; + + if (first == null || (first.next == null && first != last)) + continue; + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + last = Interlocked.Exchange(ref m_last, chunk); + + return ReadChunks(first, last); + + } while(true); } - T[] ReadChunks(Chunk chunk) { + T[] ReadChunks(Chunk chunk, object last) { var result = new List<T>(); var buffer = new T[m_chunkSize]; int actual; bool recycle; while (chunk != null) { + // ensure all write operations on the chunk are complete + chunk.Commit(); + // we need to read the chunk using this way // since some client still may completing the dequeue // operation, such clients most likely won't get results while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); - chunk = chunk.next; + if (chunk == last) { + chunk = null; + } else { + while (chunk.next == null) + Thread.MemoryBarrier(); + chunk = chunk.next; + } } return result.ToArray();
--- a/Implab/PromiseExtensions.cs Tue Jan 13 01:42:38 2015 +0300 +++ b/Implab/PromiseExtensions.cs Thu Jan 15 02:43:14 2015 +0300 @@ -94,12 +94,18 @@ return that; } - public static IPromise Combine(this ICollection<IPromise> that) { + public static IPromise Bundle(this ICollection<IPromise> that) { Safe.ArgumentNotNull(that, "that"); int count = that.Count; + int errors = 0; var medium = new Promise(); + medium.On(() => { + foreach(var p2 in that) + p2.Cancel(); + }, PromiseEventType.ErrorOrCancel); + foreach (var p in that) p.On( () => { @@ -107,15 +113,62 @@ medium.Resolve(); }, error => { - throw new Exception("The dependency promise is failed", error); + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is failed", error) + ); }, () => { - throw new OperationCanceledException("The dependency promise is cancelled"); + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is cancelled") + ); } ); return medium; } + + public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) { + Safe.ArgumentNotNull(that, "that"); + + int count = that.Count; + int errors = 0; + var medium = new Promise<T[]>(); + var results = new T[that.Count]; + + medium.On(() => { + foreach(var p2 in that) + p2.Cancel(); + }, PromiseEventType.ErrorOrCancel); + + int i = 0; + foreach (var p in that) { + var idx = i; + p.On( + x => { + results[idx] = x; + if (Interlocked.Decrement(ref count) == 0) + medium.Resolve(results); + }, + error => { + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is failed", error) + ); + }, + () => { + if (Interlocked.Increment(ref errors) == 1) + medium.Reject( + new Exception("The dependency promise is cancelled") + ); + } + ); + i++; + } + + return medium; + } #if NET_4_5
--- a/MonoPlay/Program.cs Tue Jan 13 01:42:38 2015 +0300 +++ b/MonoPlay/Program.cs Thu Jan 15 02:43:14 2015 +0300 @@ -51,7 +51,7 @@ Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); } ) - .Combine() + .Bundle() .Join(); Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); @@ -107,7 +107,7 @@ } ) - .Combine() + .Bundle() .Join();