changeset 120:f1b897999260 v2

improved asyncpool usability working on batch operations on asyncqueue
author cin
date Mon, 12 Jan 2015 05:19:52 +0300
parents 2573b562e328
children 62d2f1e98c4e
files Implab/Parallels/AsyncPool.cs Implab/Parallels/AsyncQueue.cs MonoPlay/Program.cs
diffstat 3 files changed, 66 insertions(+), 30 deletions(-) [+]
line wrap: on
line diff
--- a/Implab/Parallels/AsyncPool.cs	Sun Jan 11 19:13:02 2015 +0300
+++ b/Implab/Parallels/AsyncPool.cs	Mon Jan 12 05:19:52 2015 +0300
@@ -1,6 +1,7 @@
 using Implab.Diagnostics;
 using System;
 using System.Threading;
+using System.Linq;
 
 namespace Implab.Parallels {
 	/// <summary>
@@ -73,5 +74,13 @@
 
             return p;
         }
+
+        public static IPromise[] ThreadRun(params Action[] func) {
+            return func.Select(f => InvokeNewThread(f)).ToArray();
+        }
+
+        public static IPromise<T>[] ThreadRun<T>(params Func<T>[] func) {
+            return func.Select(f => InvokeNewThread(f)).ToArray();
+        }
 	}
 }
--- a/Implab/Parallels/AsyncQueue.cs	Sun Jan 11 19:13:02 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 05:19:52 2015 +0300
@@ -36,19 +36,14 @@
             }
 
             public bool TryEnqueue(T value,out bool extend) {
-                extend = false;
-                int alloc;
-                do {
-                    alloc = m_alloc;
-                    if (alloc > m_size)
-                        return false;
-                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
+                var alloc = Interlocked.Increment(ref m_alloc) - 1;
 
-                if (alloc == m_size) {
-                    extend = true;
+                if (alloc >= m_size) {
+                    extend = alloc == m_size;
                     return false;
                 }
-                    
+
+                extend = false;
                 m_data[alloc] = value;
 
                 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
@@ -74,6 +69,38 @@
                 return true;
             }
 
+            public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
+                int alloc;
+                int allocSize;
+
+                do {
+                    alloc = m_alloc;
+
+                    if (alloc > m_size) {
+                        enqueued = 0;
+                        extend = false;
+                        return false;
+                    }
+
+                    allocSize = Math.Min(m_size - m_alloc, length);
+                } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
+
+                if (alloc == m_size) {
+                    enqueued = 0;
+                    extend = true;
+                    return false;
+                }
+
+                Array.Copy(batch, offset, m_data, alloc, allocSize);
+                enqueued = allocSize;
+                extend = false;
+
+                while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
+                    // spin wait for commit
+                }
+                return true;
+            }
+
             public T GetAt(int pos) {
                 return m_data[pos];
             }
--- a/MonoPlay/Program.cs	Sun Jan 11 19:13:02 2015 +0300
+++ b/MonoPlay/Program.cs	Mon Jan 12 05:19:52 2015 +0300
@@ -19,30 +19,30 @@
 
             var t1 = Environment.TickCount;
 
-            new [] {
-                AsyncPool.InvokeNewThread(() => {
+            AsyncPool.ThreadRun(
+                () => {
                     for (var i = 0; i < count; i++)
                         q1.Enqueue(i);
-                }),
-                AsyncPool.InvokeNewThread(() => {
+                },
+                () => {
                     for (var i = 0; i < count; i++)
                         q1.Enqueue(i);
-                }),
-                AsyncPool.InvokeNewThread(() => {
+                },
+                () => {
                     int temp = 0;
                     int i = 0;
                     while (i < count)
                         if (q1.TryDequeue(out temp))
                             i++;
-                }),
-                AsyncPool.InvokeNewThread(() => {
+                },
+                () => {
                     int temp = 0;
                     int i = 0;
                     while (i < count)
                         if (q1.TryDequeue(out temp))
                             i++;
-                })
-            }
+                }
+            )
                 .Combine()
                 .Join();
 
@@ -65,18 +65,18 @@
             t1 = Environment.TickCount;
 
          
-            new [] {
-                AsyncPool.InvokeNewThread(() => {
+            AsyncPool.ThreadRun(
+                () => {
                     for (var i = 0; i < count; i++)
                         lock (q2)
                             q2.Enqueue(i);
-                }),
-                AsyncPool.InvokeNewThread(() => {
+                },
+                () => {
                     for (var i = 0; i < count; i++)
                         lock (q2)
                             q2.Enqueue(i);
-                }),
-                AsyncPool.InvokeNewThread(() => {
+                },
+                () => {
                     for (int i = 0; i < count ;)
                         lock (q2) {
                             if (q2.Count == 0)
@@ -85,8 +85,8 @@
                             i++;
                         }
 
-                }),
-                AsyncPool.InvokeNewThread(() => {
+                },
+                () => {
                     for (int i = 0; i < count ;)
                         lock (q2) {
                             if (q2.Count == 0)
@@ -95,8 +95,8 @@
                             i++;
                         }
 
-                })
-            }
+                }
+            )
                 .Combine()
                 .Join();