diff Implab/AbstractPromise.cs @ 125:f803565868a4 v2

improved performance of promises
author cin
date Thu, 15 Jan 2015 12:09:20 +0300
parents 2573b562e328
children 671f60cd0250
line wrap: on
line diff
--- a/Implab/AbstractPromise.cs	Thu Jan 15 02:43:14 2015 +0300
+++ b/Implab/AbstractPromise.cs	Thu Jan 15 12:09:20 2015 +0300
@@ -12,10 +12,16 @@
         const int REJECTED_STATE = 3;
         const int CANCELLED_STATE = 4;
 
+        const int RESERVED_HANDLERS_COUNT = 4;
+
         int m_state;
         Exception m_error;
+        int m_handlersCount;
 
-        readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
+        readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
+        MTQueue<THandler> m_extraHandlers;
+        int m_handlerPointer = -1;
+        int m_handlersCommited;
 
         #region state managment
         bool BeginTransit() {
@@ -88,21 +94,58 @@
         protected abstract void SignalCancelled(THandler handler);
 
         void OnSuccess() {
-            THandler handler;
-            while (m_handlers.TryDequeue(out handler))
-                SignalSuccess(handler);
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalSuccess(m_handlers[slot]);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalSuccess(handler);
+            }
         }
 
         void OnError() {
-            THandler handler;
-            while (m_handlers.TryDequeue(out handler))
-                SignalError(handler,m_error);
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalError(m_handlers[slot],m_error);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalError(handler, m_error);
+            }
         }
 
         void OnCancelled() {
-            THandler handler;
-            while (m_handlers.TryDequeue(out handler))
-                SignalCancelled(handler);
+            var hp = m_handlerPointer;
+            var slot = hp +1 ;
+            while (slot < m_handlersCommited) {
+                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
+                    SignalCancelled(m_handlers[slot]);
+                }
+                hp = m_handlerPointer;
+                slot = hp +1 ;
+            }
+
+            if (m_extraHandlers != null) {
+                THandler handler;
+                while (m_extraHandlers.TryDequeue(out handler))
+                    SignalCancelled(handler);
+            }
         }
 
         #endregion
@@ -145,21 +188,48 @@
 
         protected void AddHandler(THandler handler) {
 
-            if (IsResolved) {
-                InvokeHandler(handler);
-
-            } else {
+            if (m_state > 1) {
                 // the promise is in the resolved state, just invoke the handler
-                m_handlers.Enqueue(handler);
+                InvokeHandler(handler);
+            } else {
+                var slot = Interlocked.Increment(ref m_handlersCount) - 1;
+
+                if (slot < RESERVED_HANDLERS_COUNT) {
+                    m_handlers[slot] = handler;
+
+                    while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
+                    }
 
+                    if (m_state > 1) {
+                        do {
+                            var hp = m_handlerPointer;
+                            slot = hp + 1;
+                            if (slot < m_handlersCommited) {
+                                if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
+                                    continue;
+                                InvokeHandler(m_handlers[slot]);
+                            }
+                            break;
+                        } while(true);
+                    }
+                } else {
+                    if (slot == RESERVED_HANDLERS_COUNT) {
+                        m_extraHandlers = new MTQueue<THandler>();
+                    } else {
+                        while (m_extraHandlers == null)
+                            Thread.MemoryBarrier();
+                    }
 
-                if (IsResolved && m_handlers.TryDequeue(out handler))
+                    m_extraHandlers.Enqueue(handler);
+
+                    if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
                     // if the promise have been resolved while we was adding the handler to the queue
                     // we can't guarantee that someone is still processing it
                     // therefore we need to fetch a handler from the queue and execute it
                     // note that fetched handler may be not the one that we have added
                     // even we can fetch no handlers at all :)
                     InvokeHandler(handler);
+                }
             }
         }