changeset 121:62d2f1e98c4e v2

working version of AsyncQueue and batch operations tests
author cin
date Mon, 12 Jan 2015 18:19:41 +0300
parents f1b897999260
children 0c8685c8b56b
files Implab.Test/AsyncTests.cs Implab/Parallels/AsyncPool.cs Implab/Parallels/AsyncQueue.cs Implab/Safe.cs MonoPlay/Program.cs
diffstat 5 files changed, 375 insertions(+), 75 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Mon Jan 12 05:19:52 2015 +0300
+++ b/Implab.Test/AsyncTests.cs	Mon Jan 12 18:19:41 2015 +0300
@@ -299,47 +299,134 @@
                 Assert.AreEqual(i, res);
             }
 
-            int writers = 0;
-            int readers = 0;
-            var stop = new ManualResetEvent(false);
-            int total = 0;
+            const int count = 10000000;
 
-            const int itemsPerWriter = 10000;
-            const int writersCount = 10;
+            int res1 = 0, res2 = 0;
+            var t1 = Environment.TickCount;
 
-            for (int i = 0; i < writersCount; i++) {
-                Interlocked.Increment(ref writers);
-                AsyncPool
-                    .InvokeNewThread(() => {
-                        for (int ii = 0; ii < itemsPerWriter; ii++) {
-                            queue.Enqueue(1);
+            AsyncPool.RunThread(
+                () => {
+                    for (var i = 0; i < count; i++)
+                        queue.Enqueue(1);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    for (var i = 0; i < count; i++)
+                        queue.Enqueue(2);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    int temp;
+                    int i = 0;
+                    while (i < count)
+                        if (queue.TryDequeue(out temp)) {
+                            i++;
+                            res1 += temp;
                         }
-                        return 1;
-                    })
-                    .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
-            }
+                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    int temp;
+                    int i = 0;
+                    while (i < count)
+                        if (queue.TryDequeue(out temp)) {
+                            i++;
+                            res2 += temp;
+                        }
+                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
+                }
+            )
+                .Combine()
+                .Join();
+
+            Assert.AreEqual(count * 3, res1 + res2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                res1,
+                res2,
+                res1 + res2,
+                count
+            );
+        }
+
+        [TestMethod]
+        public void AsyncQueueBatchTest() {
+            var queue = new AsyncQueue<int>();
+
+            const int wBatch = 29;
+            const int wCount = 400000;
+            const int total = wBatch * wCount * 2;
+            const int summ = wBatch * wCount * 3;
 
-            for (int i = 0; i < 10; i++) {
-                Interlocked.Increment(ref readers);
-                AsyncPool
-                    .InvokeNewThread(() => {
-                        int t;
-                        do {
-                            while (queue.TryDequeue(out t))
-                                Interlocked.Add(ref total, t);
-                        } while (writers > 0);
-                        return 1;
-                    })
-                    .On(() => {
-                        Interlocked.Decrement(ref readers);
-                        if (readers == 0)
-                            stop.Set();
-                    }, PromiseEventType.All);
-            }
+            int r1 = 0, r2 = 0;
+            const int rBatch = 111;
+            int read = 0;
+
+            var t1 = Environment.TickCount;
+
+            AsyncPool.RunThread(
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 1;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[wBatch];
+                    for(int i = 0; i<wBatch; i++)
+                        buffer[i] = 2;
+
+                    for(int i =0; i < wCount; i++)
+                        queue.EnqueueRange(buffer,0,wBatch);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[rBatch];
 
-            stop.WaitOne();
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r1 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                        }
+                    }
+
+                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
+                },
+                () => {
+                    var buffer = new int[rBatch];
 
-            Assert.AreEqual(itemsPerWriter * writersCount, total);
+                    while(read < total) {
+                        int actual;
+                        if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
+                            for(int i=0; i< actual; i++)
+                                r2 += buffer[i];
+                            Interlocked.Add(ref read, actual);
+                        }
+                    }
+
+                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
+                }
+            )
+                .Combine()
+                .Join();
+
+            Assert.AreEqual(summ , r1 + r2);
+
+            Console.WriteLine(
+                "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
+                Environment.TickCount - t1,
+                r1,
+                r2,
+                r1 + r2,
+                total
+            );
         }
 
         [TestMethod]
--- a/Implab/Parallels/AsyncPool.cs	Mon Jan 12 05:19:52 2015 +0300
+++ b/Implab/Parallels/AsyncPool.cs	Mon Jan 12 18:19:41 2015 +0300
@@ -75,11 +75,11 @@
             return p;
         }
 
-        public static IPromise[] ThreadRun(params Action[] func) {
+        public static IPromise[] RunThread(params Action[] func) {
             return func.Select(f => InvokeNewThread(f)).ToArray();
         }
 
-        public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
+        public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
             return func.Select(f => InvokeNewThread(f)).ToArray();
         }
 	}
--- a/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 05:19:52 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 18:19:41 2015 +0300
@@ -27,6 +27,14 @@
                 m_data[0] = value;
             }
 
+            public Chunk(int size, T[] data, int offset, int length, int alloc) {
+                m_size = size;
+                m_hi = length;
+                m_alloc = alloc;
+                m_data = new T[size];
+                Array.Copy(data, offset, m_data, 0, length);
+            }
+
             public int Low {
                 get { return m_low; }
             }
@@ -35,7 +43,7 @@
                 get { return m_hi; }
             }
 
-            public bool TryEnqueue(T value,out bool extend) {
+            public bool TryEnqueue(T value, out bool extend) {
                 var alloc = Interlocked.Increment(ref m_alloc) - 1;
 
                 if (alloc >= m_size) {
@@ -52,7 +60,7 @@
                 return true;
             }
 
-            public bool TryDequeue(out T value,out bool recycle) {
+            public bool TryDequeue(out T value, out bool recycle) {
                 int low;
                 do {
                     low = m_low;
@@ -73,27 +81,35 @@
                 int alloc;
                 int allocSize;
 
+                // in case the batch size is larger than a free space in chunk
+                // tailGap is used to over allocate the space in the chunk to
+                // get exclusive permission on creation of the next one.
+                int tailGap = 0;
+
                 do {
                     alloc = m_alloc;
 
                     if (alloc > m_size) {
-                        enqueued = 0;
-                        extend = false;
-                        return false;
+                        // the chunk is full and someone already
+                        // creating the new one
+                        enqueued = 0; // nothing was added
+                        extend = false; // the caller shouldn't try to extend the queue
+                        return false; // nothing was added
                     }
 
-                    allocSize = Math.Min(m_size - m_alloc, length);
-                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
+                    allocSize = Math.Min(m_size - alloc, length);
+                    if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
+                        tailGap = 1; // overallocate space to get exclusive permission to extend queue
+                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
+                    
+                extend = tailGap != 0;
+                enqueued = allocSize;
 
-                if (alloc == m_size) {
-                    enqueued = 0;
-                    extend = true;
+                // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
+                if (alloc == m_size)
                     return false;
-                }
 
                 Array.Copy(batch, offset, m_data, alloc, allocSize);
-                enqueued = allocSize;
-                extend = false;
 
                 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
                     // spin wait for commit
@@ -101,12 +117,35 @@
                 return true;
             }
 
+            public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
+                int low, hi, batchSize;
+
+                do {
+                    low = m_low;
+                    hi = m_hi;
+                    if (low >= hi) {
+                        dequeued = 0;
+                        recycle = (low == m_size); // recycling could be restarted and we need to signal again
+                        return false;
+                    }
+                    batchSize = Math.Min(hi - low, length);
+                } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
+
+                recycle = (low == m_size - batchSize);
+                dequeued = batchSize;
+
+                Array.Copy(m_data, low, buffer, offset, batchSize);
+
+                return true;
+            }
+
             public T GetAt(int pos) {
                 return m_data[pos];
             }
         }
 
         public const int DEFAULT_CHUNK_SIZE = 32;
+        public const int MAX_CHUNK_SIZE = 262144;
 
         readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
 
@@ -117,11 +156,15 @@
             m_last = m_first = new Chunk(m_chunkSize);
         }
 
+        /// <summary>
+        /// Adds the specified value to the queue.
+        /// </summary>
+        /// <param name="value">Tha value which will be added to the queue.</param>
         public void Enqueue(T value) {
             var last = m_last;
             // spin wait to the new chunk
             bool extend = true;
-            while(last == null || !last.TryEnqueue(value, out extend)) {
+            while (last == null || !last.TryEnqueue(value, out extend)) {
                 // try to extend queue
                 if (extend || last == null) {
                     var chunk = new Chunk(m_chunkSize, value);
@@ -129,14 +172,88 @@
                         break;
                     last = m_last;
                 } else {
-                    while (last != m_last) {
+                    while (last == m_last) {
                         Thread.MemoryBarrier();
-                        last = m_last;
                     }
+                    last = m_last;
                 }
             }
         }
 
+        /// <summary>
+        /// Adds the specified data to the queue.
+        /// </summary>
+        /// <param name="data">The buffer which contains the data to be enqueued.</param>
+        /// <param name="offset">The offset of the data in the buffer.</param>
+        /// <param name="length">The size of the data to read from the buffer.</param>
+        public void EnqueueRange(T[] data, int offset, int length) {
+            if (data == null)
+                throw new ArgumentNullException("data");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > data.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var last = m_last;
+
+            bool extend;
+            int enqueued;
+
+            while (length > 0) {
+                extend = true;
+                if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
+                    length -= enqueued;
+                    offset += enqueued;
+                }
+
+                if (extend) {
+                    // there was no enough space in the chunk
+                    // or there was no chunks in the queue
+
+                    while (length > 0) {
+
+                        var size = Math.Min(length, MAX_CHUNK_SIZE);
+
+                        var chunk = new Chunk(
+                            Math.Max(size, m_chunkSize),
+                            data,
+                            offset,
+                            size,
+                            length // length >= size
+                        );
+
+                        if (!EnqueueChunk(last, chunk)) {
+                            // looks like the queue has been updated then proceed from the beginning
+                            last = m_last; 
+                            break;
+                        }
+
+                        // we have successfully added the new chunk
+                        last = chunk;
+                        length -= size;
+                        offset += size;
+                    }
+                } else {
+                    // we don't need to extend the queue, if we successfully enqueued data
+                    if (length == 0)
+                        break;
+
+                    // if we need to wait while someone is extending the queue
+                    // spinwait
+                    while (last == m_last) {
+                        Thread.MemoryBarrier();
+                    }
+
+                    last = m_last;
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tries to retrieve the first element from the queue.
+        /// </summary>
+        /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
+        /// <param name="value">The value of the dequeued element.</param>
         public bool TryDequeue(out T value) {
             var chunk = m_first;
             bool recycle;
@@ -161,6 +278,92 @@
             return false;
         }
 
+        /// <summary>
+        /// Tries to dequeue the specified amount of data from the queue.
+        /// </summary>
+        /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
+        /// <param name="buffer">The buffer to which the data will be written.</param>
+        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
+        /// <param name="length">The maximum amount of data to be retrieved.</param>
+        /// <param name="dequeued">The actual amout of the retrieved data.</param>
+        public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
+            if (buffer == null)
+                throw new ArgumentNullException("buffer");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > buffer.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var chunk = m_first;
+            bool recycle;
+            dequeued = 0;
+            while (chunk != null) {
+
+                int actual;
+                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
+                    offset += actual;
+                    length -= actual;
+                    dequeued += actual;
+                }
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+                else if (actual == 0)
+                    break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
+
+                if (length == 0)
+                    return true;
+
+                // we still may dequeue something
+                // try again
+                chunk = m_first;
+            }
+
+            return dequeued != 0;
+        }
+
+        /// <summary>
+        /// Tries to dequeue all remaining data in the first chunk.
+        /// </summary>
+        /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
+        /// <param name="buffer">The buffer to which data will be written.</param>
+        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
+        /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
+        /// <param name="dequeued">The actual amount of the dequeued data.</param>
+        public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
+            if (buffer == null)
+                throw new ArgumentNullException("buffer");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > buffer.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var chunk = m_first;
+            bool recycle;
+            dequeued = 0;
+
+            while (chunk != null) {
+
+                int actual;
+                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
+                    dequeued = actual;
+                }
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+
+                // if we have dequeued any data, then return
+                if (dequeued != 0)
+                    return true;
+
+                // we still may dequeue something
+                // try again
+                chunk = m_first;
+            }
+
+            return false;
+        }
+
         bool EnqueueChunk(Chunk last, Chunk chunk) {
             if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
                 return false;
--- a/Implab/Safe.cs	Mon Jan 12 05:19:52 2015 +0300
+++ b/Implab/Safe.cs	Mon Jan 12 18:19:41 2015 +0300
@@ -9,31 +9,31 @@
 {
     public static class Safe
     {
-        public static void ArgumentMatch(string param, string name, Regex rx) {
+        public static void ArgumentMatch(string value, string paramName, Regex rx) {
             if (rx == null)
                 throw new ArgumentNullException("rx");
-            if (!rx.IsMatch(param))
-                throw new ArgumentException(String.Format("The prameter value must match {0}", rx), name);
+            if (!rx.IsMatch(value))
+                throw new ArgumentException(String.Format("The prameter value must match {0}", rx), paramName);
         }
 
-        public static void ArgumentNotEmpty(string param, string name) {
-            if (String.IsNullOrEmpty(param))
-                throw new ArgumentException("The parameter can't be empty", name);
+        public static void ArgumentNotEmpty(string value, string paramName) {
+            if (String.IsNullOrEmpty(value))
+                throw new ArgumentException("The parameter can't be empty", paramName);
         }
 
-        public static void ArgumentNotEmpty<T>(T[] param, string name) {
-            if (param == null || param.Length == 0)
-                throw new ArgumentException("The array must be not emty", name);
+        public static void ArgumentNotEmpty<T>(T[] value, string paramName) {
+            if (value == null || value.Length == 0)
+                throw new ArgumentException("The array must be not emty", paramName);
         }
 
-        public static void ArgumentNotNull(object param, string name) {
-            if (param == null)
-                throw new ArgumentNullException(name);
+        public static void ArgumentNotNull(object value, string paramName) {
+            if (value == null)
+                throw new ArgumentNullException(paramName);
         }
 
-        public static void ArgumentInRange(int arg, int min, int max, string name) {
-            if (arg < min || arg > max)
-                throw new ArgumentOutOfRangeException(name);
+        public static void ArgumentInRange(int value, int min, int max, string paramName) {
+            if (value < min || value > max)
+                throw new ArgumentOutOfRangeException(paramName);
         }
 
         public static void Dispose<T>(T obj) where T : class
--- a/MonoPlay/Program.cs	Mon Jan 12 05:19:52 2015 +0300
+++ b/MonoPlay/Program.cs	Mon Jan 12 18:19:41 2015 +0300
@@ -16,36 +16,46 @@
 
             const int count = 10000000;
 
-
+            int res1 = 0, res2 = 0;
             var t1 = Environment.TickCount;
 
-            AsyncPool.ThreadRun(
+            AsyncPool.RunThread(
                 () => {
                     for (var i = 0; i < count; i++)
-                        q1.Enqueue(i);
+                        q1.Enqueue(1);
+                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
                 },
                 () => {
                     for (var i = 0; i < count; i++)
-                        q1.Enqueue(i);
+                        q1.Enqueue(2);
+                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
                 },
                 () => {
                     int temp = 0;
                     int i = 0;
                     while (i < count)
-                        if (q1.TryDequeue(out temp))
+                        if (q1.TryDequeue(out temp)) {
                             i++;
+                            res1 += temp;
+                        }
+                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
                 },
                 () => {
                     int temp = 0;
                     int i = 0;
                     while (i < count)
-                        if (q1.TryDequeue(out temp))
+                        if (q1.TryDequeue(out temp)) {
                             i++;
+                            res2 += temp;
+                        }
+                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
                 }
             )
                 .Combine()
                 .Join();
 
+            Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2);
+
             var t2 = Environment.TickCount;
             Console.WriteLine("MTQueue: {0} ms", t2 - t1);
 
@@ -65,7 +75,7 @@
             t1 = Environment.TickCount;
 
          
-            AsyncPool.ThreadRun(
+            AsyncPool.RunThread(
                 () => {
                     for (var i = 0; i < count; i++)
                         lock (q2)