diff Implab/Parallels/AsyncQueue.cs @ 122:0c8685c8b56b v2

minor fixes and improvements of AsyncQueue, additional tests
author cin
date Mon, 12 Jan 2015 22:20:45 +0300
parents 62d2f1e98c4e
children f4d6ea6969cc
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 18:19:41 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 22:20:45 2015 +0300
@@ -78,42 +78,31 @@
             }
 
             public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
-                int alloc;
-                int allocSize;
-
-                // in case the batch size is larger than a free space in chunk
-                // tailGap is used to over allocate the space in the chunk to
-                // get exclusive permission on creation of the next one.
-                int tailGap = 0;
-
-                do {
-                    alloc = m_alloc;
+                //int alloc;
+                //int allocSize;
 
-                    if (alloc > m_size) {
-                        // the chunk is full and someone already
-                        // creating the new one
-                        enqueued = 0; // nothing was added
-                        extend = false; // the caller shouldn't try to extend the queue
-                        return false; // nothing was added
-                    }
+                var alloc = Interlocked.Add(ref m_alloc, length) - length;
+                if (alloc > m_size) {
+                    // the chunk is full and someone already
+                    // creating the new one
+                    enqueued = 0; // nothing was added
+                    extend = false; // the caller shouldn't try to extend the queue
+                    return false; // nothing was added
+                }
 
-                    allocSize = Math.Min(m_size - alloc, length);
-                    if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
-                        tailGap = 1; // overallocate space to get exclusive permission to extend queue
-                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
-                    
-                extend = tailGap != 0;
-                enqueued = allocSize;
+                enqueued = Math.Min(m_size - alloc, length);
+                extend = length > enqueued;
 
-                // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
-                if (alloc == m_size)
+                if (enqueued == 0)
                     return false;
 
-                Array.Copy(batch, offset, m_data, alloc, allocSize);
 
-                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
+                Array.Copy(batch, offset, m_data, alloc, enqueued);
+
+                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
                     // spin wait for commit
                 }
+
                 return true;
             }
 
@@ -169,7 +158,7 @@
                 if (extend || last == null) {
                     var chunk = new Chunk(m_chunkSize, value);
                     if (EnqueueChunk(last, chunk))
-                        break;
+                        break; // success! exit!
                     last = m_last;
                 } else {
                     while (last == m_last) {
@@ -326,7 +315,7 @@
         /// Tries to dequeue all remaining data in the first chunk.
         /// </summary>
         /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
-        /// <param name="buffer">The buffer to which data will be written.</param>
+        /// <param name="buffer">The buffer to which the data will be written.</param>
         /// <param name="offset">The offset in the buffer at which the data will be written.</param>
         /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
         /// <param name="dequeued">The actual amount of the dequeued data.</param>