changeset 124:a336cb13c6a9 v2

major update, added Drain mathod to AsyncQueue class
author cin
date Thu, 15 Jan 2015 02:43:14 +0300
parents f4d6ea6969cc
children f803565868a4
files Implab.Test/AsyncTests.cs Implab/Parallels/ArrayTraits.cs Implab/Parallels/AsyncPool.cs Implab/Parallels/AsyncQueue.cs Implab/PromiseExtensions.cs MonoPlay/Program.cs
diffstat 6 files changed, 247 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/Implab.Test/AsyncTests.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -249,7 +249,7 @@
             for (int i = 0; i < writersCount; i++) {
                 Interlocked.Increment(ref writers);
                 AsyncPool
-                    .InvokeNewThread(() => {
+                    .RunThread(() => {
                         for (int ii = 0; ii < itemsPerWriter; ii++) {
                             queue.Enqueue(1);
                         }
@@ -261,7 +261,7 @@
             for (int i = 0; i < 10; i++) {
                 Interlocked.Increment(ref readers);
                 AsyncPool
-                    .InvokeNewThread(() => {
+                    .RunThread(() => {
                         int t;
                         do {
                             while (queue.TryDequeue(out t))
@@ -336,7 +336,7 @@
                     Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                 }
             )
-                .Combine()
+                .Bundle()
                 .Join();
 
             Assert.AreEqual(count * 3, res1 + res2);
@@ -414,7 +414,7 @@
                     Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                 }
             )
-                .Combine()
+                .Bundle()
                 .Join();
 
             Assert.AreEqual(summ , r1 + r2);
@@ -490,7 +490,110 @@
                     Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
                 }
             )
-                .Combine()
+                .Bundle()
+                .Join();
+
+            Assert.AreEqual(summ , r1 + r2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2,
+                total
+            );
+        }
+
+        [TestMethod]
+        public void AsyncQueueDrainTest() {
+            var queue = new AsyncQueue<int>();
+
+            const int wBatch = 11;
+            const int wCount = 200000;
+            const int total = wBatch * wCount * 3;
+            const int summ = wBatch * wCount * 3;
+
+            int r1 = 0, r2 = 0;
+            const int rBatch = 11;
+            int read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    for(int i =0; i < wCount * wBatch; i++)
+                        queue.Enqueue(1);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
+                },
+                /*() => {
+                    int temp;
+                    int count = 0;
+                    while (read < total)
+                        if (queue.TryDequeue(out temp)) {
+                            count++;
+                            r1 += temp;
+                            Interlocked.Increment(ref read);
+                        }
+                    Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
+                },*/
+                /*() => {
+                    var buffer = new int[rBatch];
+                    var count = 0;
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r1 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                            count += actual;
+                        }
+                    }
+
+                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+                },*/
+                () => {
+                    var count = 0;
+                    while(read < total) {
+                        var buffer = queue.Drain();
+                        for(int i=0; i< buffer.Length; i++)
+                            r1 += buffer[i];
+                            Interlocked.Add(ref read, buffer.Length);
+                        count += buffer.Length;
+                    }
+                    Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
+                },
+                () => {
+                    var count = 0;
+                    while(read < total) {
+                        var buffer = queue.Drain();
+                        for(int i=0; i< buffer.Length; i++)
+                            r2 += buffer[i];
+                        Interlocked.Add(ref read, buffer.Length);
+                        count += buffer.Length;
+                    }
+                    Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
+                }
+            )
+                .Bundle()
                 .Join();
 
             Assert.AreEqual(summ , r1 + r2);
--- a/Implab/Parallels/ArrayTraits.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/Implab/Parallels/ArrayTraits.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -162,7 +162,7 @@
             int slots = threads;
 
             // Analysis disable AccessToDisposedClosure
-            AsyncPool.InvokeNewThread<int>(() => {
+            AsyncPool.RunThread<int>(() => {
                 for (int i = 0; i < source.Length; i++) {
                     if(promise.IsResolved)
                         break; // stop processing in case of error or cancellation
--- a/Implab/Parallels/AsyncPool.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/Implab/Parallels/AsyncPool.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -31,7 +31,7 @@
 			return p;
 		}
 
-        public static IPromise<T> InvokeNewThread<T>(Func<T> func) {
+        public static IPromise<T> RunThread<T>(Func<T> func) {
             var p = new Promise<T>();
 
             var caller = TraceContext.Instance.CurrentOperation;
@@ -53,7 +53,7 @@
         }
 
 
-        public static IPromise InvokeNewThread(Action func) {
+        public static IPromise RunThread(Action func) {
             var p = new Promise();
 
             var caller = TraceContext.Instance.CurrentOperation;
@@ -76,11 +76,11 @@
         }
 
         public static IPromise[] RunThread(params Action[] func) {
-            return func.Select(f => InvokeNewThread(f)).ToArray();
+            return func.Select(f => RunThread(f)).ToArray();
         }
 
         public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
-            return func.Select(f => InvokeNewThread(f)).ToArray();
+            return func.Select(f => RunThread(f)).ToArray();
         }
 	}
 }
--- a/Implab/Parallels/AsyncQueue.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -2,6 +2,7 @@
 using System.Collections.Generic;
 using System;
 using System.Collections;
+using System.Diagnostics;
 
 namespace Implab.Parallels {
     public class AsyncQueue<T> : IEnumerable<T> {
@@ -60,6 +61,16 @@
                 return true;
             }
 
+            /// <summary>
+            /// Prevents from allocating new space in the chunk and waits for all write operations to complete
+            /// </summary>
+            public void Commit() {
+                var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
+
+                while (m_hi != actual)
+                    Thread.MemoryBarrier();
+            }
+
             public bool TryDequeue(out T value, out bool recycle) {
                 int low;
                 do {
@@ -359,76 +370,114 @@
 
             if (last != null)
                 last.next = chunk;
-            else
+            else {
                 m_first = chunk;
+            }
             return true;
         }
 
         void RecycleFirstChunk(Chunk first) {
             var next = first.next;
 
+            if (first != Interlocked.CompareExchange(ref m_first, next, first))
+                return;
+
             if (next == null) {
-                // looks like this is the last chunk
+
                 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
+                    /*while (first.next == null)
+                        Thread.MemoryBarrier();*/
+
                     // race
-                    // maybe someone already recycled this chunk
-                    // or a new chunk has been appedned to the queue
-
-                    return; // give up
+                    // someone already updated the tail, restore the pointer to the queue head
+                    m_first = first;
                 }
                 // the tail is updated
             }
 
             // we need to update the head
-            Interlocked.CompareExchange(ref m_first, next, first);
+            //Interlocked.CompareExchange(ref m_first, next, first);
             // if the head is already updated then give up
-            return;
+            //return;
 
         }
 
         public void Clear() {
             // start the new queue
-            var t = new Chunk(m_chunkSize);
-            Thread.MemoryBarrier();
-            m_last = t;
-            Thread.MemoryBarrier();
+            var chunk = new Chunk(m_chunkSize);
+
+            do {
+                Thread.MemoryBarrier();
+                var first = m_first;
+                var last = m_last;
+
+                if (last == null) // nothing to clear
+                    return;
 
-            // make the new queue available to the readers, and stop the old one
-            m_first = t;
-            Thread.MemoryBarrier();
+                if (first == null || (first.next == null && first != last)) // inconcistency
+                    continue;
+
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 
+                    continue;// inconsistent
+
+                m_last = chunk;
+
+                return;
+
+            } while(true);
         }
 
         public T[] Drain() {
             // start the new queue
-            var t = new Chunk(m_chunkSize);
-            Thread.MemoryBarrier();
-            m_last = t;
-            Thread.MemoryBarrier();
-
-            // make the new queue available to the readers, and stop the old one
-            Chunk first;
+            var chunk = new Chunk(m_chunkSize);
 
             do {
-                first = m_first;
-            } while(first != Interlocked.CompareExchange(ref m_first
-            Thread.MemoryBarrier();
+                Thread.MemoryBarrier();
+                var first = m_first;
+                var last = m_last;
+
+                if (last == null)
+                    return new T[0];
+
+                if (first == null || (first.next == null && first != last))
+                    continue;
 
+                // here we will create inconsistency which will force others to spin
+                // and prevent from fetching. chunk.next = null
+                if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
+                    continue;// inconsistent
 
+                last = Interlocked.Exchange(ref m_last, chunk);
+
+                return ReadChunks(first, last);
+
+            } while(true);
         }
             
-        T[] ReadChunks(Chunk chunk) {
+        T[] ReadChunks(Chunk chunk, object last) {
             var result = new List<T>();
             var buffer = new T[m_chunkSize];
             int actual;
             bool recycle;
             while (chunk != null) {
+                // ensure all write operations on the chunk are complete
+                chunk.Commit();
+
                 // we need to read the chunk using this way
                 // since some client still may completing the dequeue
                 // operation, such clients most likely won't get results
                 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
                     result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
 
-                chunk = chunk.next;
+                if (chunk == last) {
+                    chunk = null;
+                } else {
+                    while (chunk.next == null)
+                        Thread.MemoryBarrier();
+                    chunk = chunk.next;
+                }
             }
 
             return result.ToArray();
--- a/Implab/PromiseExtensions.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/Implab/PromiseExtensions.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -94,12 +94,18 @@
             return that;
         }
 
-        public static IPromise Combine(this ICollection<IPromise> that) {
+        public static IPromise Bundle(this ICollection<IPromise> that) {
             Safe.ArgumentNotNull(that, "that");
 
             int count = that.Count;
+            int errors = 0;
             var medium = new Promise();
 
+            medium.On(() => {
+                foreach(var p2 in that)
+                    p2.Cancel();
+            }, PromiseEventType.ErrorOrCancel);
+
             foreach (var p in that)
                 p.On(
                     () => {
@@ -107,15 +113,62 @@
                             medium.Resolve();
                     },
                     error => {
-                        throw new Exception("The dependency promise is failed", error);
+                        if (Interlocked.Increment(ref errors) == 1)
+                            medium.Reject(
+                                new Exception("The dependency promise is failed", error)
+                            );
                     },
                     () => {
-                        throw new OperationCanceledException("The dependency promise is cancelled");
+                        if (Interlocked.Increment(ref errors) == 1)
+                            medium.Reject(
+                                new Exception("The dependency promise is cancelled")
+                            );
                     }
                 );
 
             return medium;
         }
+
+        public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
+            Safe.ArgumentNotNull(that, "that");
+
+            int count = that.Count;
+            int errors = 0;
+            var medium = new Promise<T[]>();
+            var results = new T[that.Count];
+
+            medium.On(() => {
+                foreach(var p2 in that)
+                    p2.Cancel();
+            }, PromiseEventType.ErrorOrCancel);
+
+            int i = 0;
+            foreach (var p in that) {
+                var idx = i;
+                p.On(
+                    x => {
+                        results[idx] = x;
+                        if (Interlocked.Decrement(ref count) == 0)
+                            medium.Resolve(results);
+                    },
+                    error => {
+                        if (Interlocked.Increment(ref errors) == 1)
+                            medium.Reject(
+                                new Exception("The dependency promise is failed", error)
+                            );
+                    },
+                    () => {
+                        if (Interlocked.Increment(ref errors) == 1)
+                            medium.Reject(
+                                new Exception("The dependency promise is cancelled")
+                            );
+                    }
+                );
+                i++;
+            }
+
+            return medium;
+        }
             
         #if NET_4_5
 
--- a/MonoPlay/Program.cs	Tue Jan 13 01:42:38 2015 +0300
+++ b/MonoPlay/Program.cs	Thu Jan 15 02:43:14 2015 +0300
@@ -51,7 +51,7 @@
                     Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                 }
             )
-                .Combine()
+                .Bundle()
                 .Join();
 
             Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
@@ -107,7 +107,7 @@
 
                 }
             )
-                .Combine()
+                .Bundle()
                 .Join();