changeset 122:0c8685c8b56b v2

minor fixes and improvements of AsyncQueue, additional tests
author cin
date Mon, 12 Jan 2015 22:20:45 +0300
parents 62d2f1e98c4e
children f4d6ea6969cc
files Implab.Test/AsyncTests.cs Implab/Parallels/AsyncQueue.cs MonoPlay/Program.cs
diffstat 3 files changed, 96 insertions(+), 31 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Mon Jan 12 18:19:41 2015 +0300
+++ b/Implab.Test/AsyncTests.cs	Mon Jan 12 22:20:45 2015 +0300
@@ -430,6 +430,82 @@
         }
 
         [TestMethod]
+        public void AsyncQueueChunkDequeueTest() {
+            var queue = new AsyncQueue<int>();
+
+            const int wBatch = 31;
+            const int wCount = 200000;
+            const int total = wBatch * wCount * 3;
+            const int summ = wBatch * wCount * 6;
+
+            int r1 = 0, r2 = 0;
+            const int rBatch = 1024;
+            int read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 2;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 3;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[rBatch];
+                    int count = 1;
+                    double avgchunk = 0;
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r2 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                            avgchunk = avgchunk*(count-1)/count + actual/(double)count;
+                            count ++;
+                        }
+                    }
+
+                    Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
+                }
+            )
+                .Combine()
+                .Join();
+
+            Assert.AreEqual(summ , r1 + r2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2,
+                total
+            );
+        }
+
+        [TestMethod]
         public void ParallelMapTest() {
 
             const int count = 100000;
--- a/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 18:19:41 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 22:20:45 2015 +0300
@@ -78,42 +78,31 @@
             }
 
             public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
-                int alloc;
-                int allocSize;
-
-                // in case the batch size is larger than a free space in chunk
-                // tailGap is used to over allocate the space in the chunk to
-                // get exclusive permission on creation of the next one.
-                int tailGap = 0;
-
-                do {
-                    alloc = m_alloc;
+                //int alloc;
+                //int allocSize;
 
-                    if (alloc > m_size) {
-                        // the chunk is full and someone already
-                        // creating the new one
-                        enqueued = 0; // nothing was added
-                        extend = false; // the caller shouldn't try to extend the queue
-                        return false; // nothing was added
-                    }
+                var alloc = Interlocked.Add(ref m_alloc, length) - length;
+                if (alloc > m_size) {
+                    // the chunk is full and someone already
+                    // creating the new one
+                    enqueued = 0; // nothing was added
+                    extend = false; // the caller shouldn't try to extend the queue
+                    return false; // nothing was added
+                }
 
-                    allocSize = Math.Min(m_size - alloc, length);
-                    if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
-                        tailGap = 1; // overallocate space to get exclusive permission to extend queue
-                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
-                    
-                extend = tailGap != 0;
-                enqueued = allocSize;
+                enqueued = Math.Min(m_size - alloc, length);
+                extend = length > enqueued;
 
-                // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
-                if (alloc == m_size)
+                if (enqueued == 0)
                     return false;
 
-                Array.Copy(batch, offset, m_data, alloc, allocSize);
 
-                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
+                Array.Copy(batch, offset, m_data, alloc, enqueued);
+
+                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
                     // spin wait for commit
                 }
+
                 return true;
             }
 
@@ -169,7 +158,7 @@
                 if (extend || last == null) {
                     var chunk = new Chunk(m_chunkSize, value);
                     if (EnqueueChunk(last, chunk))
-                        break;
+                        break; // success! exit!
                     last = m_last;
                 } else {
                     while (last == m_last) {
@@ -326,7 +315,7 @@
         /// Tries to dequeue all remaining data in the first chunk.
         /// </summary>
         /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
-        /// <param name="buffer">The buffer to which data will be written.</param>
+        /// <param name="buffer">The buffer to which the data will be written.</param>
         /// <param name="offset">The offset in the buffer at which the data will be written.</param>
         /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
         /// <param name="dequeued">The actual amount of the dequeued data.</param>
--- a/MonoPlay/Program.cs	Mon Jan 12 18:19:41 2015 +0300
+++ b/MonoPlay/Program.cs	Mon Jan 12 22:20:45 2015 +0300
@@ -54,7 +54,7 @@
                 .Combine()
                 .Join();
 
-            Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
+            Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
 
             var t2 = Environment.TickCount;
             Console.WriteLine("MTQueue: {0} ms", t2 - t1);