changeset 125:f803565868a4 v2

improved performance of promises
author cin
date Thu, 15 Jan 2015 12:09:20 +0300 (2015-01-15)
parents a336cb13c6a9
children f7b2b8bfbb8c
files Implab.Test/AsyncTests.cs Implab/AbstractPromise.cs Implab/Parallels/AsyncQueue.cs Implab/Parallels/DispatchPool.cs MonoPlay/Program.cs
diffstat 5 files changed, 103 insertions(+), 127 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Thu Jan 15 02:43:14 2015 +0300
+++ b/Implab.Test/AsyncTests.cs	Thu Jan 15 12:09:20 2015 +0300
@@ -633,7 +633,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(0,10,1)) {
+            using (var pool = new WorkerPool()) {
                 const int count = 10000;
 
                 var args = new double[count];
--- a/Implab/AbstractPromise.cs	Thu Jan 15 02:43:14 2015 +0300
+++ b/Implab/AbstractPromise.cs	Thu Jan 15 12:09:20 2015 +0300
@@ -12,10 +12,16 @@
         const int REJECTED_STATE = 3;
         const int CANCELLED_STATE = 4;
 
+        const int RESERVED_HANDLERS_COUNT = 4;
+
         int m_state;
         Exception m_error;
+        int m_handlersCount;
 
-        readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
+        readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
+        MTQueue<THandler> m_extraHandlers;
+        int m_handlerPointer = -1;
+        int m_handlersCommited;
 
         #region state managment
         bool BeginTransit() {
@@ -88,21 +94,58 @@
         protected abstract void SignalCancelled(THandler handler);
 
         void OnSuccess() {
-            THandler handler;
-            while (m_handlers.TryDequeue(out handler))
-                SignalSuccess(handler);
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalSuccess(m_handlers[slot]);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalSuccess(handler);
+            }
         }
 
         void OnError() {
-            THandler handler;
-            while (m_handlers.TryDequeue(out handler))
-                SignalError(handler,m_error);
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalError(m_handlers[slot],m_error);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalError(handler, m_error);
+            }
         }
 
         void OnCancelled() {
-            THandler handler;
-            while (m_handlers.TryDequeue(out handler))
-                SignalCancelled(handler);
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalCancelled(m_handlers[slot]);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalCancelled(handler);
+            }
         }
 
         #endregion
@@ -145,21 +188,48 @@
 
         protected void AddHandler(THandler handler) {
 
-            if (IsResolved) {
-                InvokeHandler(handler);
-
-            } else {
+            if (m_state > 1) {
                 // the promise is in the resolved state, just invoke the handler
-                m_handlers.Enqueue(handler);
+                InvokeHandler(handler);
+            } else {
+                var slot = Interlocked.Increment(ref m_handlersCount) - 1;
+
+                if (slot < RESERVED_HANDLERS_COUNT) {
+                    m_handlers[slot] = handler;
+
+                    while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
+                    }
 
+                    if (m_state > 1) {
+                        do {
+                            var hp = m_handlerPointer;
+                            slot = hp + 1;
+                            if (slot < m_handlersCommited) {
+                                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
+                                    continue;
+                                InvokeHandler(m_handlers[slot]);
+                            }
+                            break;
+                        } while(true);
+                    }
+                } else {
+                    if (slot == RESERVED_HANDLERS_COUNT) {
+                        m_extraHandlers = new MTQueue<THandler>();
+                    } else {
+                        while (m_extraHandlers == null)
+                            Thread.MemoryBarrier();
+                    }
 
-                if (IsResolved && m_handlers.TryDequeue(out handler))
+                    m_extraHandlers.Enqueue(handler);
+
+                    if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
                     // if the promise have been resolved while we was adding the handler to the queue
                     // we can't guarantee that someone is still processing it
                     // therefore we need to fetch a handler from the queue and execute it
                     // note that fetched handler may be not the one that we have added
                     // even we can fetch no handlers at all :)
                     InvokeHandler(handler);
+                }
             }
         }
 
--- a/Implab/Parallels/AsyncQueue.cs	Thu Jan 15 02:43:14 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Thu Jan 15 12:09:20 2015 +0300
@@ -147,15 +147,9 @@
         public const int DEFAULT_CHUNK_SIZE = 32;
         public const int MAX_CHUNK_SIZE = 262144;
 
-        readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
-
         Chunk m_first;
         Chunk m_last;
 
-        public AsyncQueue() {
-            m_last = m_first = new Chunk(m_chunkSize);
-        }
-
         /// <summary>
         /// Adds the specified value to the queue.
         /// </summary>
@@ -167,7 +161,7 @@
             while (last == null || !last.TryEnqueue(value, out extend)) {
                 // try to extend queue
                 if (extend || last == null) {
-                    var chunk = new Chunk(m_chunkSize, value);
+                    var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
                     if (EnqueueChunk(last, chunk))
                         break; // success! exit!
                     last = m_last;
@@ -215,7 +209,7 @@
                         var size = Math.Min(length, MAX_CHUNK_SIZE);
 
                         var chunk = new Chunk(
-                            Math.Max(size, m_chunkSize),
+                            Math.Max(size, DEFAULT_CHUNK_SIZE),
                             data,
                             offset,
                             size,
@@ -404,7 +398,7 @@
 
         public void Clear() {
             // start the new queue
-            var chunk = new Chunk(m_chunkSize);
+            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
 
             do {
                 Thread.MemoryBarrier();
@@ -431,7 +425,7 @@
 
         public T[] Drain() {
             // start the new queue
-            var chunk = new Chunk(m_chunkSize);
+            var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
 
             do {
                 Thread.MemoryBarrier();
@@ -458,7 +452,7 @@
             
         T[] ReadChunks(Chunk chunk, object last) {
             var result = new List<T>();
-            var buffer = new T[m_chunkSize];
+            var buffer = new T[DEFAULT_CHUNK_SIZE];
             int actual;
             bool recycle;
             while (chunk != null) {
--- a/Implab/Parallels/DispatchPool.cs	Thu Jan 15 02:43:14 2015 +0300
+++ b/Implab/Parallels/DispatchPool.cs	Thu Jan 15 12:09:20 2015 +0300
@@ -30,11 +30,9 @@
         }
 
         protected DispatchPool() {
-            int maxThreads, maxCP;
-            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
 
             m_minThreadsLimit = 0;
-            m_maxThreadsLimit = maxThreads;
+            m_maxThreadsLimit = Environment.ProcessorCount;
         }
 
         protected void InitPool() {
--- a/MonoPlay/Program.cs	Thu Jan 15 02:43:14 2015 +0300
+++ b/MonoPlay/Program.cs	Thu Jan 15 12:09:20 2015 +0300
@@ -11,109 +11,23 @@
             if (args == null)
                 throw new ArgumentNullException("args");
 
-            var q1 = new AsyncQueue<int>();
-            var q2 = new Queue<int>();
-
             const int count = 10000000;
 
-            int res1 = 0, res2 = 0;
             var t1 = Environment.TickCount;
 
-            AsyncPool.RunThread(
-                () => {
-                    for (var i = 0; i < count; i++)
-                        q1.Enqueue(1);
-                    Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
-                },
-                () => {
-                    for (var i = 0; i < count; i++)
-                        q1.Enqueue(2);
-                    Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
-                },
-                () => {
-                    int temp = 0;
-                    int i = 0;
-                    while (i < count)
-                        if (q1.TryDequeue(out temp)) {
-                            i++;
-                            res1 += temp;
-                        }
-                    Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
-                },
-                () => {
-                    int temp = 0;
-                    int i = 0;
-                    while (i < count)
-                        if (q1.TryDequeue(out temp)) {
-                            i++;
-                            res2 += temp;
-                        }
-                    Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
-                }
-            )
-                .Bundle()
-                .Join();
+            for (int i = 0; i < count; i++) {
+                var p = new Promise<int>();
+
+                p.On(x => {}).On(x => {});
 
-            Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2);
+                p.Resolve(i);
+
+            }
+
+           
 
             var t2 = Environment.TickCount;
-            Console.WriteLine("MTQueue: {0} ms", t2 - t1);
-
-            t1 = Environment.TickCount;
-
-            for (var i = 0; i < count * 2; i++)
-                q2.Enqueue(i);
-
-            for (var i = 0; i < count * 2; i++)
-                q2.Dequeue();
-
-            t2 = Environment.TickCount;
-            Console.WriteLine("Queue: {0} ms", t2 - t1);
-
-            q2 = new Queue<int>();
-
-            t1 = Environment.TickCount;
-
-         
-            AsyncPool.RunThread(
-                () => {
-                    for (var i = 0; i < count; i++)
-                        lock (q2)
-                            q2.Enqueue(i);
-                },
-                () => {
-                    for (var i = 0; i < count; i++)
-                        lock (q2)
-                            q2.Enqueue(i);
-                },
-                () => {
-                    for (int i = 0; i < count ;)
-                        lock (q2) {
-                            if (q2.Count == 0)
-                                continue;
-                            q2.Dequeue();
-                            i++;
-                        }
-
-                },
-                () => {
-                    for (int i = 0; i < count ;)
-                        lock (q2) {
-                            if (q2.Count == 0)
-                                continue;
-                            q2.Dequeue();
-                            i++;
-                        }
-
-                }
-            )
-                .Bundle()
-                .Join();
-
-
-
-            t2 = Environment.TickCount;
-            Console.WriteLine("Queue+Lock: {0} ms", t2 - t1);
+            Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
 
         }
     }