diff Implab/Parallels/AsyncQueue.cs @ 121:62d2f1e98c4e v2

working version of AsyncQueue and batch operations tests
author cin
date Mon, 12 Jan 2015 18:19:41 +0300
parents f1b897999260
children 0c8685c8b56b
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 05:19:52 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 18:19:41 2015 +0300
@@ -27,6 +27,14 @@
                 m_data[0] = value;
             }
 
+            public Chunk(int size, T[] data, int offset, int length, int alloc) {
+                m_size = size;
+                m_hi = length;
+                m_alloc = alloc;
+                m_data = new T[size];
+                Array.Copy(data, offset, m_data, 0, length);
+            }
+
             public int Low {
                 get { return m_low; }
             }
@@ -35,7 +43,7 @@
                 get { return m_hi; }
             }
 
-            public bool TryEnqueue(T value,out bool extend) {
+            public bool TryEnqueue(T value, out bool extend) {
                 var alloc = Interlocked.Increment(ref m_alloc) - 1;
 
                 if (alloc >= m_size) {
@@ -52,7 +60,7 @@
                 return true;
             }
 
-            public bool TryDequeue(out T value,out bool recycle) {
+            public bool TryDequeue(out T value, out bool recycle) {
                 int low;
                 do {
                     low = m_low;
@@ -73,27 +81,35 @@
                 int alloc;
                 int allocSize;
 
+                // in case the batch size is larger than a free space in chunk
+                // tailGap is used to over allocate the space in the chunk to
+                // get exclusive permission on creation of the next one.
+                int tailGap = 0;
+
                 do {
                     alloc = m_alloc;
 
                     if (alloc > m_size) {
-                        enqueued = 0;
-                        extend = false;
-                        return false;
+                        // the chunk is full and someone already
+                        // creating the new one
+                        enqueued = 0; // nothing was added
+                        extend = false; // the caller shouldn't try to extend the queue
+                        return false; // nothing was added
                     }
 
-                    allocSize = Math.Min(m_size - m_alloc, length);
-                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
+                    allocSize = Math.Min(m_size - alloc, length);
+                    if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
+                        tailGap = 1; // overallocate space to get exclusive permission to extend queue
+                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
+                    
+                extend = tailGap != 0;
+                enqueued = allocSize;
 
-                if (alloc == m_size) {
-                    enqueued = 0;
-                    extend = true;
+                // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
+                if (alloc == m_size)
                     return false;
-                }
 
                 Array.Copy(batch, offset, m_data, alloc, allocSize);
-                enqueued = allocSize;
-                extend = false;
 
                 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
                     // spin wait for commit
@@ -101,12 +117,35 @@
                 return true;
             }
 
+            public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
+                int low, hi, batchSize;
+
+                do {
+                    low = m_low;
+                    hi = m_hi;
+                    if (low >= hi) {
+                        dequeued = 0;
+                        recycle = (low == m_size); // recycling could be restarted and we need to signal again
+                        return false;
+                    }
+                    batchSize = Math.Min(hi - low, length);
+                } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
+
+                recycle = (low == m_size - batchSize);
+                dequeued = batchSize;
+
+                Array.Copy(m_data, low, buffer, offset, batchSize);
+
+                return true;
+            }
+
             public T GetAt(int pos) {
                 return m_data[pos];
             }
         }
 
         public const int DEFAULT_CHUNK_SIZE = 32;
+        public const int MAX_CHUNK_SIZE = 262144;
 
         readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
 
@@ -117,11 +156,15 @@
             m_last = m_first = new Chunk(m_chunkSize);
         }
 
+        /// <summary>
+        /// Adds the specified value to the queue.
+        /// </summary>
+        /// <param name="value">Tha value which will be added to the queue.</param>
         public void Enqueue(T value) {
             var last = m_last;
             // spin wait to the new chunk
             bool extend = true;
-            while(last == null || !last.TryEnqueue(value, out extend)) {
+            while (last == null || !last.TryEnqueue(value, out extend)) {
                 // try to extend queue
                 if (extend || last == null) {
                     var chunk = new Chunk(m_chunkSize, value);
@@ -129,14 +172,88 @@
                         break;
                     last = m_last;
                 } else {
-                    while (last != m_last) {
+                    while (last == m_last) {
                         Thread.MemoryBarrier();
-                        last = m_last;
                     }
+                    last = m_last;
                 }
             }
         }
 
+        /// <summary>
+        /// Adds the specified data to the queue.
+        /// </summary>
+        /// <param name="data">The buffer which contains the data to be enqueued.</param>
+        /// <param name="offset">The offset of the data in the buffer.</param>
+        /// <param name="length">The size of the data to read from the buffer.</param>
+        public void EnqueueRange(T[] data, int offset, int length) {
+            if (data == null)
+                throw new ArgumentNullException("data");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > data.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var last = m_last;
+
+            bool extend;
+            int enqueued;
+
+            while (length > 0) {
+                extend = true;
+                if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
+                    length -= enqueued;
+                    offset += enqueued;
+                }
+
+                if (extend) {
+                    // there was no enough space in the chunk
+                    // or there was no chunks in the queue
+
+                    while (length > 0) {
+
+                        var size = Math.Min(length, MAX_CHUNK_SIZE);
+
+                        var chunk = new Chunk(
+                            Math.Max(size, m_chunkSize),
+                            data,
+                            offset,
+                            size,
+                            length // length >= size
+                        );
+
+                        if (!EnqueueChunk(last, chunk)) {
+                            // looks like the queue has been updated then proceed from the beginning
+                            last = m_last; 
+                            break;
+                        }
+
+                        // we have successfully added the new chunk
+                        last = chunk;
+                        length -= size;
+                        offset += size;
+                    }
+                } else {
+                    // we don't need to extend the queue, if we successfully enqueued data
+                    if (length == 0)
+                        break;
+
+                    // if we need to wait while someone is extending the queue
+                    // spinwait
+                    while (last == m_last) {
+                        Thread.MemoryBarrier();
+                    }
+
+                    last = m_last;
+                }
+            }
+        }
+
+        /// <summary>
+        /// Tries to retrieve the first element from the queue.
+        /// </summary>
+        /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
+        /// <param name="value">The value of the dequeued element.</param>
         public bool TryDequeue(out T value) {
             var chunk = m_first;
             bool recycle;
@@ -161,6 +278,92 @@
             return false;
         }
 
+        /// <summary>
+        /// Tries to dequeue the specified amount of data from the queue.
+        /// </summary>
+        /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
+        /// <param name="buffer">The buffer to which the data will be written.</param>
+        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
+        /// <param name="length">The maximum amount of data to be retrieved.</param>
+        /// <param name="dequeued">The actual amout of the retrieved data.</param>
+        public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
+            if (buffer == null)
+                throw new ArgumentNullException("buffer");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > buffer.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var chunk = m_first;
+            bool recycle;
+            dequeued = 0;
+            while (chunk != null) {
+
+                int actual;
+                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
+                    offset += actual;
+                    length -= actual;
+                    dequeued += actual;
+                }
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+                else if (actual == 0)
+                    break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
+
+                if (length == 0)
+                    return true;
+
+                // we still may dequeue something
+                // try again
+                chunk = m_first;
+            }
+
+            return dequeued != 0;
+        }
+
+        /// <summary>
+        /// Tries to dequeue all remaining data in the first chunk.
+        /// </summary>
+        /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
+        /// <param name="buffer">The buffer to which data will be written.</param>
+        /// <param name="offset">The offset in the buffer at which the data will be written.</param>
+        /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
+        /// <param name="dequeued">The actual amount of the dequeued data.</param>
+        public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
+            if (buffer == null)
+                throw new ArgumentNullException("buffer");
+            if (offset < 0)
+                throw new ArgumentOutOfRangeException("offset");
+            if (length < 1 || offset + length > buffer.Length)
+                throw new ArgumentOutOfRangeException("length");
+
+            var chunk = m_first;
+            bool recycle;
+            dequeued = 0;
+
+            while (chunk != null) {
+
+                int actual;
+                if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
+                    dequeued = actual;
+                }
+
+                if (recycle) // this chunk is waste
+                    RecycleFirstChunk(chunk);
+
+                // if we have dequeued any data, then return
+                if (dequeued != 0)
+                    return true;
+
+                // we still may dequeue something
+                // try again
+                chunk = m_first;
+            }
+
+            return false;
+        }
+
         bool EnqueueChunk(Chunk last, Chunk chunk) {
             if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
                 return false;