diff Implab/Parallels/AsyncQueue.cs @ 120:f1b897999260 v2

improved asyncpool usability working on batch operations on asyncqueue
author cin
date Mon, 12 Jan 2015 05:19:52 +0300
parents 2573b562e328
children 62d2f1e98c4e
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs	Sun Jan 11 19:13:02 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 05:19:52 2015 +0300
@@ -36,19 +36,14 @@
             }
 
             public bool TryEnqueue(T value,out bool extend) {
-                extend = false;
-                int alloc;
-                do {
-                    alloc = m_alloc;
-                    if (alloc > m_size)
-                        return false;
-                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
+                var alloc = Interlocked.Increment(ref m_alloc) - 1;
 
-                if (alloc == m_size) {
-                    extend = true;
+                if (alloc >= m_size) {
+                    extend = alloc == m_size;
                     return false;
                 }
-                    
+
+                extend = false;
                 m_data[alloc] = value;
 
                 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
@@ -74,6 +69,38 @@
                 return true;
             }
 
+            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
+                int alloc;
+                int allocSize;
+
+                do {
+                    alloc = m_alloc;
+
+                    if (alloc > m_size) {
+                        enqueued = 0;
+                        extend = false;
+                        return false;
+                    }
+
+                    allocSize = Math.Min(m_size - m_alloc, length);
+                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
+
+                if (alloc == m_size) {
+                    enqueued = 0;
+                    extend = true;
+                    return false;
+                }
+
+                Array.Copy(batch, offset, m_data, alloc, allocSize);
+                enqueued = allocSize;
+                extend = false;
+
+                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
+                    // spin wait for commit
+                }
+                return true;
+            }
+
             public T GetAt(int pos) {
                 return m_data[pos];
             }