Mercurial > pub > ImplabNet
diff Implab/AbstractPromise.cs @ 125:f803565868a4 v2
improved performance of promises
author | cin |
---|---|
date | Thu, 15 Jan 2015 12:09:20 +0300 |
parents | 2573b562e328 |
children | 671f60cd0250 |
line wrap: on
line diff
--- a/Implab/AbstractPromise.cs Thu Jan 15 02:43:14 2015 +0300 +++ b/Implab/AbstractPromise.cs Thu Jan 15 12:09:20 2015 +0300 @@ -12,10 +12,16 @@ const int REJECTED_STATE = 3; const int CANCELLED_STATE = 4; + const int RESERVED_HANDLERS_COUNT = 4; + int m_state; Exception m_error; + int m_handlersCount; - readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>(); + readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; + MTQueue<THandler> m_extraHandlers; + int m_handlerPointer = -1; + int m_handlersCommited; #region state managment bool BeginTransit() { @@ -88,21 +94,58 @@ protected abstract void SignalCancelled(THandler handler); void OnSuccess() { - THandler handler; - while (m_handlers.TryDequeue(out handler)) - SignalSuccess(handler); + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalSuccess(m_handlers[slot]); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalSuccess(handler); + } } void OnError() { - THandler handler; - while (m_handlers.TryDequeue(out handler)) - SignalError(handler,m_error); + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalError(m_handlers[slot],m_error); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalError(handler, m_error); + } } void OnCancelled() { - THandler handler; - while (m_handlers.TryDequeue(out handler)) - SignalCancelled(handler); + var hp = m_handlerPointer; + var slot = hp +1 ; + while (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { + SignalCancelled(m_handlers[slot]); + } + hp = m_handlerPointer; + slot = hp +1 ; + } + + if (m_extraHandlers != null) { + THandler handler; + while (m_extraHandlers.TryDequeue(out handler)) + SignalCancelled(handler); + } } #endregion @@ -145,21 +188,48 @@ protected void AddHandler(THandler handler) { - if (IsResolved) { - InvokeHandler(handler); - - } else { + if (m_state > 1) { // the promise is in the resolved state, just invoke the handler - m_handlers.Enqueue(handler); + InvokeHandler(handler); + } else { + var slot = Interlocked.Increment(ref m_handlersCount) - 1; + + if (slot < RESERVED_HANDLERS_COUNT) { + m_handlers[slot] = handler; + + while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { + } + if (m_state > 1) { + do { + var hp = m_handlerPointer; + slot = hp + 1; + if (slot < m_handlersCommited) { + if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) + continue; + InvokeHandler(m_handlers[slot]); + } + break; + } while(true); + } + } else { + if (slot == RESERVED_HANDLERS_COUNT) { + m_extraHandlers = new MTQueue<THandler>(); + } else { + while (m_extraHandlers == null) + Thread.MemoryBarrier(); + } - if (IsResolved && m_handlers.TryDequeue(out handler)) + m_extraHandlers.Enqueue(handler); + + if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) InvokeHandler(handler); + } } }