changeset 23:f0568ff069a5

Слияние с promises
author cin
date Wed, 13 Nov 2013 14:03:20 +0400
parents 0c924dff5498 (current diff) 5a35900264f5 (diff)
children ee04e1fa78da
files
diffstat 9 files changed, 368 insertions(+), 285 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Wed Nov 13 14:03:20 2013 +0400
@@ -14,7 +14,7 @@
             p.Then(x => res = x);
             p.Resolve(100);
 
-            Assert.AreEqual(res, 100);
+            Assert.AreEqual(100, res);
         }
 
         [TestMethod]
@@ -101,18 +101,18 @@
         public void WorkerPoolSizeTest() {
             var pool = new WorkerPool(5, 10, 0);
 
-            Assert.AreEqual(5, pool.ThreadCount);
+            Assert.AreEqual(5, pool.PoolSize);
 
-            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
-            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
 
-            Assert.AreEqual(5, pool.ThreadCount);
+            Assert.AreEqual(5, pool.PoolSize);
 
             for (int i = 0; i < 100; i++)
-                pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
-            Thread.Sleep(100);
-            Assert.AreEqual(10, pool.ThreadCount);
+                pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+            Thread.Sleep(200);
+            Assert.AreEqual(10, pool.PoolSize);
 
             pool.Dispose();
         }
@@ -149,10 +149,10 @@
         [TestMethod]
         public void WorkerPoolDisposeTest() {
             var pool = new WorkerPool(5, 20);
-            Assert.AreEqual(5, pool.ThreadCount);
+            Assert.AreEqual(5, pool.PoolSize);
             pool.Dispose();
-            Thread.Sleep(100);
-            Assert.AreEqual(0, pool.ThreadCount);
+            Thread.Sleep(500);
+            Assert.AreEqual(0, pool.PoolSize);
             pool.Dispose();
         }
 
@@ -244,7 +244,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(8,100,0)) {
+            using (var pool = new WorkerPool(4,4,0)) {
                 int count = 10000;
 
                 double[] args = new double[count];
Binary file Implab.suo has changed
Binary file Implab.v11.suo has changed
--- a/Implab/IPromise.cs	Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/IPromise.cs	Wed Nov 13 14:03:20 2013 +0400
@@ -15,19 +15,6 @@
             get;
         }
 
-        /// <summary>
-        /// The current state of the promise.
-        /// </summary>
-        PromiseState State
-        {
-            get;
-        }
 
-        /// <summary>
-        /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
-        /// handler will be invoked immediatelly.
-        /// </summary>
-        /// <param name="handler">The handler</param>
-        void HandleCancelled(Action handler);
     }
 }
--- a/Implab/Parallels/ArrayTraits.cs	Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Wed Nov 13 14:03:20 2013 +0400
@@ -140,7 +140,7 @@
 
             AsyncPool.InvokeNewThread(() => {
                 for (int i = 0; i < source.Length; i++) {
-                    if(promise.State != PromiseState.Unresolved)
+                    if(promise.IsResolved)
                         break; // stop processing in case of error or cancellation
                     var idx = i;
                     semaphore.WaitOne();
--- a/Implab/Parallels/DispatchPool.cs	Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Wed Nov 13 14:03:20 2013 +0400
@@ -9,10 +9,15 @@
     public abstract class DispatchPool<TUnit> : IDisposable {
         readonly int m_minThreads;
         readonly int m_maxThreads;
-        int m_runningThreads = 0;
-        int m_maxRunningThreads = 0;
-        int m_suspended = 0;
-        int m_exitRequired = 0;
+
+        int m_createdThreads = 0; // the current size of the pool
+        int m_activeThreads = 0; // the count of threads which are active
+        int m_sleepingThreads = 0; // the count of currently inactive threads
+        int m_maxRunningThreads = 0; // the meximum reached size of the pool
+        int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
+        int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
+        int m_wakeEvents = 0; // the count of wake events
+        
         AutoResetEvent m_hasTasks = new AutoResetEvent(false);
 
         protected DispatchPool(int min, int max) {
@@ -44,9 +49,15 @@
                 StartWorker();
         }
 
-        public int ThreadCount {
+        public int PoolSize {
             get {
-                return m_runningThreads;
+                return m_createdThreads;
+            }
+        }
+
+        public int ActiveThreads {
+            get {
+                return m_activeThreads;
             }
         }
 
@@ -64,29 +75,120 @@
 
         protected abstract bool TryDequeue(out TUnit unit);
 
-        protected virtual bool ExtendPool() {
-            if (m_suspended > 0) {
+        #region thread execution traits
+        int SignalThread() {
+            var signals = Interlocked.Increment(ref m_wakeEvents);
+            if(signals == 1)
                 m_hasTasks.Set();
+            return signals;
+        }
+
+        bool FetchSignalOrWait(int timeout) {
+            var start = Environment.TickCount;
+
+            // означает, что поток владеет блокировкой и при успешном получении сигнала должен
+            // ее вернуть, чтобы другой ожидающий поток смог 
+            bool hasLock = false;
+            do {
+                int signals;
+                do {
+                    signals = m_wakeEvents;
+                    if (signals == 0)
+                        break;
+                } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
+
+                if (signals >= 1) {
+                    if (signals > 1 && hasLock)
+                        m_hasTasks.Set();
+                    return true;
+                }
+                
+                if (timeout != -1)
+                    timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+
+                // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
+                // и уйдет на пустой цикл, после чего заблокируется
+
+                hasLock = true; 
+            } while (m_hasTasks.WaitOne(timeout));
+
+            return false;
+        }
+
+        bool Sleep(int timeout) {
+            Interlocked.Increment(ref m_sleepingThreads);
+            if (FetchSignalOrWait(timeout)) {
+                Interlocked.Decrement(ref m_sleepingThreads);
                 return true;
-            } else
-                return StartWorker();
+            } else {
+                Interlocked.Decrement(ref m_sleepingThreads);
+                return false;
+            }
         }
+        #endregion
 
         /// <summary>
         /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
         /// </summary>
-        protected void WakePool() {
-            m_hasTasks.Set(); // wake sleeping thread;
+        protected void GrowPool() {
+            if (m_exitRequired != 0)
+                return;
+            if (m_sleepingThreads > m_wakeEvents) {
+                //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
+
+                // all sleeping threads may gone
+                SignalThread(); // wake a sleeping thread;
 
-            if (AllocateThreadSlot(1)) {
-                var worker = new Thread(this.Worker);
-                worker.IsBackground = true;
-                worker.Start();
+                // we can't check whether signal has been processed
+                // anyway it may take some time for the thread to start
+                // we will ensure that at least one thread is running
+
+                if (AllocateThreadSlot(1)) {
+                    // if there were no threads in the pool
+                    var worker = new Thread(this.Worker);
+                    worker.IsBackground = true;
+                    worker.Start();
+                }
+            } else {
+                // if there is no sleeping threads in the pool
+                StartWorker();
             }
         }
 
-        protected virtual void Suspend() {
-            m_hasTasks.WaitOne();
+        private bool Suspend() {
+            //no tasks left, exit if the thread is no longer needed
+            bool last;
+            bool requestExit;
+
+            // if threads have a timeout before releasing
+            if (m_releaseTimeout > 0)
+                requestExit = !Sleep(m_releaseTimeout);
+            else
+                requestExit = true;
+
+            if (!requestExit)
+                return true;
+
+            // release unsused thread
+            if (requestExit && ReleaseThreadSlot(out last)) {
+                // in case at the moment the last thread was being released
+                // a new task was added to the queue, we need to try
+                // to revoke the thread to avoid the situation when the task is left unprocessed
+                if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
+                    if (AllocateThreadSlot(1))
+                        return true; // spin again...
+                    else
+                        SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
+                    
+                }
+
+                return false;
+            }
+
+            // wait till infinity
+            Sleep(-1);
+
+            return true;
         }
 
         #region thread slots traits
@@ -95,11 +197,11 @@
             int current;
             // use spins to allocate slot for the new thread
             do {
-                current = m_runningThreads;
+                current = m_createdThreads;
                 if (current >= m_maxThreads || m_exitRequired != 0)
                     // no more slots left or the pool has been disposed
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
 
             UpdateMaxThreads(current + 1);
 
@@ -107,7 +209,7 @@
         }
 
         bool AllocateThreadSlot(int desired) {
-            if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
+            if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
                 return false;
 
             UpdateMaxThreads(desired);
@@ -120,11 +222,11 @@
             int current;
             // use spins to release slot for the new thread
             do {
-                current = m_runningThreads;
+                current = m_createdThreads;
                 if (current <= m_minThreads && m_exitRequired == 0)
                     // the thread is reserved
                     return false;
-            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
+            } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
 
             last = (current == 1);
 
@@ -136,7 +238,7 @@
         /// </summary>
         /// <returns>true - no more threads left</returns>
         bool ReleaseThreadSlotAnyway() {
-            var left = Interlocked.Decrement(ref m_runningThreads);
+            var left = Interlocked.Decrement(ref m_createdThreads);
             return left == 0;
         }
 
@@ -164,54 +266,41 @@
             }
         }
 
-        bool FetchTask(out TUnit unit) {
+        protected abstract void InvokeUnit(TUnit unit);
+
+        void Worker() {
+            TUnit unit;
+            //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
+            Interlocked.Increment(ref m_activeThreads);
             do {
                 // exit if requested
                 if (m_exitRequired != 0) {
                     // release the thread slot
+                    Interlocked.Decrement(ref m_activeThreads);
                     if (ReleaseThreadSlotAnyway()) // it was the last worker
                         m_hasTasks.Dispose();
                     else
-                        m_hasTasks.Set(); // wake next worker
+                        SignalThread(); // wake next worker
                     unit = default(TUnit);
-                    return false;
+                    break;
                 }
 
                 // fetch task
                 if (TryDequeue(out unit)) {
-                    ExtendPool();
-                    return true;
+                    InvokeUnit(unit);
+                    continue;
                 }
 
-                //no tasks left, exit if the thread is no longer needed
-                bool last;
-                if (ReleaseThreadSlot(out last)) {
-                    if (last && m_hasTasks.WaitOne(0)) {
-                        if (AllocateThreadSlot(1))
-                            continue; // spin again...
-                        else
-                            // we failed to reallocate slot for this thread
-                            // therefore we need to release the event
-                            m_hasTasks.Set(); 
-                    }
-
-                    return false;
-                }
+                Interlocked.Decrement(ref m_activeThreads);
 
                 // entering suspend state
-                Interlocked.Increment(ref m_suspended);
                 // keep this thread and wait                
-                Suspend();
-                Interlocked.Decrement(ref m_suspended);
+                if (!Suspend())
+                    break;
+                //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
+                Interlocked.Increment(ref m_activeThreads);
             } while (true);
-        }
-
-        protected abstract void InvokeUnit(TUnit unit);
-
-        void Worker() {
-            TUnit unit;
-            while (FetchTask(out unit))
-                InvokeUnit(unit);
+            //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
         }
 
         protected virtual void Dispose(bool disposing) {
@@ -221,7 +310,10 @@
                         return;
 
                     // wake sleeping threads
-                    m_hasTasks.Set();
+                    if (m_createdThreads > 0)
+                        SignalThread();
+                    else
+                        m_hasTasks.Dispose();
                     GC.SuppressFinalize(this);
                 }
             }
--- a/Implab/Parallels/MTQueue.cs	Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/MTQueue.cs	Wed Nov 13 14:03:20 2013 +0400
@@ -42,12 +42,13 @@
                 next = first.next;
                 if (next == null) {
                     // this is the last element,
-                    // then try to update tail
+                    // then try to update the tail
                     if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
-                        // this is inconsistent situation which means that the queue is empty
+                        // this is a ace condition
                         if (m_last == null)
+                            // the queue is empty
                             return false;
-                        // tail has been changed, that means that we need to restart
+                        // tail has been changed, than we need to restart
                         continue; 
                     }
 
--- a/Implab/Parallels/WorkerPool.cs	Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Wed Nov 13 14:03:20 2013 +0400
@@ -57,16 +57,8 @@
             var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
 
-            if(!ExtendPool())
-                WakePool();
-        }
-
-        protected override bool ExtendPool() {
-            if (m_queueLength <= m_threshold*ThreadCount)
-                // in this case we are in active thread and it request for additional workers
-                // satisfy it only when queue is longer than threshold
-                return false;
-            return base.ExtendPool();
+            if (len > m_threshold*ActiveThreads)
+                GrowPool();
         }
 
         protected override bool TryDequeue(out Action unit) {
@@ -81,9 +73,5 @@
             unit();
         }
 
-        protected override void Suspend() {
-            if (m_queueLength == 0)
-                base.Suspend();
-        }
     }
 }
--- a/Implab/Promise.cs	Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Promise.cs	Wed Nov 13 14:03:20 2013 +0400
@@ -3,6 +3,7 @@
 using System.Reflection;
 using System.Diagnostics;
 using System.Threading;
+using Implab.Parallels;
 
 namespace Implab {
 
@@ -48,24 +49,53 @@
     /// </remarks>
     public class Promise<T> : IPromise {
 
-        struct ResultHandlerInfo {
+        struct HandlerDescriptor {
             public ResultHandler<T> resultHandler;
             public ErrorHandler errorHandler;
+            public Action cancellHandler;
+
+            public void Resolve(T result) {
+                if (resultHandler != null)
+                    try {
+                        resultHandler(result);
+                    } catch (Exception e) {
+                        Reject(e);
+                    }
+            }
+
+            public void Reject(Exception err) {
+                if (errorHandler != null)
+                    try {
+                        errorHandler(err);
+                    } catch {
+                    }
+            }
+
+            public void Cancel() {
+                if (cancellHandler != null)
+                    try {
+                        cancellHandler();
+                    } catch {
+                    }
+            }
         }
 
+        const int UnresolvedSate = 0;
+        const int TransitionalState = 1;
+        const int ResolvedState = 2;
+        const int RejectedState = 3;
+        const int CancelledState = 4;
+
         readonly IPromise m_parent;
-
-        LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
-        LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
+        readonly bool m_cancellable;
 
-        readonly object m_lock = new Object();
-        readonly bool m_cancellable;
         int m_childrenCount = 0;
-
-        PromiseState m_state;
+        int m_state;
         T m_result;
         Exception m_error;
 
+        readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
+
         public Promise() {
             m_cancellable = true;
         }
@@ -73,8 +103,6 @@
         public Promise(IPromise parent, bool cancellable) {
             m_cancellable = cancellable;
             m_parent = parent;
-            if (parent != null)
-                parent.HandleCancelled(InternalCancel);
         }
 
         void InternalCancel() {
@@ -82,22 +110,39 @@
             Cancel(false);
         }
 
+        bool BeginTransit() {
+            return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
+        }
+
+        void CompleteTransit(int state) {
+            if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
+                throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
+        }
+
+        public bool IsResolved {
+            get {
+                return m_state > 1;
+            }
+        }
+
+        public bool IsCancelled {
+            get {
+                return m_state == CancelledState;
+            }
+        }
+
         /// <summary>
         /// Выполняет обещание, сообщая об успешном выполнении.
         /// </summary>
         /// <param name="result">Результат выполнения.</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Resolve(T result) {
-            lock (m_lock) {
-                if (m_state == PromiseState.Cancelled)
-                    return;
-                if (m_state != PromiseState.Unresolved)
-                    throw new InvalidOperationException("The promise is already resolved");
+            if (BeginTransit()) {
                 m_result = result;
-                m_state = PromiseState.Resolved;
-            }
-
-            OnStateChanged();
+                CompleteTransit(ResolvedState);
+                OnStateChanged();
+            } else if (m_state != CancelledState)
+                throw new InvalidOperationException("The promise is already resolved");
         }
 
         /// <summary>
@@ -111,16 +156,12 @@
         /// <param name="error">Исключение возникшее при выполнении операции</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Reject(Exception error) {
-            lock (m_lock) {
-                if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
-                    return;
-                if (m_state != PromiseState.Unresolved)
-                    throw new InvalidOperationException("The promise is already resolved");
+            if (BeginTransit()) {
                 m_error = error;
-                m_state = PromiseState.Rejected;
-            }
-
-            OnStateChanged();
+                CompleteTransit(RejectedState);
+                OnStateChanged();
+            } else if (m_state == ResolvedState)
+                throw new InvalidOperationException("The promise is already resolved");
         }
 
         /// <summary>
@@ -144,27 +185,27 @@
 
             var medium = new Promise<T>(this, true);
 
-            var handlerInfo = new ResultHandlerInfo();
-
+            ResultHandler<T> resultHandler;
             if (success != null)
-                handlerInfo.resultHandler = x => {
+                resultHandler = x => {
                     success(x);
                     medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = medium.Resolve;
+                resultHandler = medium.Resolve;
 
+            ErrorHandler errorHandler;
             if (error != null)
-                handlerInfo.errorHandler = x => {
+                errorHandler = x => {
                     try {
                         error(x);
                     } catch { }
                     medium.Reject(x);
                 };
             else
-                handlerInfo.errorHandler = medium.Reject;
+                errorHandler = medium.Reject;
 
-            AddHandler(handlerInfo);
+            AddHandler(resultHandler, errorHandler, medium.InternalCancel);
 
             return medium;
         }
@@ -182,27 +223,28 @@
 
             var medium = new Promise<T>(this, true);
 
-            var handlerInfo = new ResultHandlerInfo();
+            ResultHandler<T> resultHandler;
+            ErrorHandler errorHandler;
 
             if (success != null)
-                handlerInfo.resultHandler = x => {
+                resultHandler = x => {
                     success(x);
                     medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = medium.Resolve;
+                resultHandler = medium.Resolve;
 
             if (error != null)
-                handlerInfo.errorHandler = x => {
+                errorHandler = x => {
                     try {
                         medium.Resolve(error(x));
                     } catch { }
                     medium.Reject(x);
                 };
             else
-                handlerInfo.errorHandler = medium.Reject;
+                errorHandler = medium.Reject;
 
-            AddHandler(handlerInfo);
+            AddHandler(resultHandler, errorHandler, medium.InternalCancel);
 
             return medium;
         }
@@ -214,19 +256,17 @@
 
             var medium = new Promise<T>(this, true);
 
-            var handlerInfo = new ResultHandlerInfo();
-
+            ResultHandler<T> resultHandler;
+            
             if (success != null)
-                handlerInfo.resultHandler = x => {
+                resultHandler = x => {
                     success(x);
                     medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = medium.Resolve;
+                resultHandler = medium.Resolve;
 
-            handlerInfo.errorHandler = medium.Reject;
-
-            AddHandler(handlerInfo);
+            AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
 
             return medium;
         }
@@ -249,15 +289,17 @@
 
             var medium = new Promise<T>(this, true);
 
-            AddHandler(new ResultHandlerInfo {
-                errorHandler = e => {
+            AddHandler(
+                null,
+                e => {
                     try {
                         medium.Resolve(handler(e));
                     } catch (Exception e2) {
                         medium.Reject(e2);
                     }
-                }
-            });
+                },
+                medium.InternalCancel
+            );
 
             return medium;
         }
@@ -268,8 +310,8 @@
 
             var medium = new Promise<T>();
 
-            AddHandler(new ResultHandlerInfo {
-                resultHandler = x => {
+            AddHandler(
+                x => {
                     // to avoid handler being called multiple times we handle exception by ourselfs
                     try {
                         handler();
@@ -278,13 +320,16 @@
                         medium.Reject(e);
                     }
                 },
-                errorHandler = x => {
+
+                e => {
                     try {
                         handler();
                     } catch { }
-                    medium.Reject(x);
-                }
-            });
+                    medium.Reject(e);
+                },
+
+                medium.InternalCancel
+            );
 
             return medium;
         }
@@ -304,17 +349,22 @@
             // создаем прицепленное обещание
             var chained = new Promise<TNew>();
 
-            AddHandler(new ResultHandlerInfo() {
-                resultHandler = result => chained.Resolve(mapper(result)),
-                errorHandler = delegate(Exception e) {
-                    if (error != null)
-                        try {
-                            error(e);
-                        } catch { }
-                    // в случае ошибки нужно передать исключение дальше по цепочке
-                    chained.Reject(e);
-                }
-            });
+            ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
+            ErrorHandler errorHandler = delegate(Exception e) {
+                if (error != null)
+                    try {
+                        error(e);
+                    } catch { }
+                // в случае ошибки нужно передать исключение дальше по цепочке
+                chained.Reject(e);
+            };
+
+
+            AddHandler(
+                resultHandler,
+                errorHandler,
+                chained.InternalCancel
+            );
 
             return chained;
         }
@@ -341,27 +391,32 @@
             // передать через него результаты работы.
             var medium = new Promise<TNew>(this, true);
 
-            AddHandler(new ResultHandlerInfo {
-                resultHandler = delegate(T result) {
-                    if (medium.State == PromiseState.Cancelled)
-                        return;
+            ResultHandler<T> resultHandler = delegate(T result) {
+                if (medium.IsCancelled)
+                    return;
 
-                    var promise = chained(result);
+                var promise = chained(result);
 
-                    // notify chained operation that it's not needed
-                    medium.Cancelled(() => promise.Cancel());
-                    promise.Then(
-                        x => medium.Resolve(x),
-                        e => medium.Reject(e)
-                    );
-                },
-                errorHandler = delegate(Exception e) {
-                    if (error != null)
-                        error(e);
-                    // в случае ошибки нужно передать исключение дальше по цепочке
-                    medium.Reject(e);
-                }
-            });
+                // notify chained operation that it's not needed
+                medium.Cancelled(() => promise.Cancel());
+                promise.Then(
+                    x => medium.Resolve(x),
+                    e => medium.Reject(e)
+                );
+            };
+
+            ErrorHandler errorHandler = delegate(Exception e) {
+                if (error != null)
+                    error(e);
+                // в случае ошибки нужно передать исключение дальше по цепочке
+                medium.Reject(e);
+            };
+
+            AddHandler(
+                resultHandler,
+                errorHandler,
+                medium.InternalCancel
+            );
 
             return medium;
         }
@@ -371,19 +426,19 @@
         }
 
         public Promise<T> Cancelled(Action handler) {
-            if (handler == null)
-                return this;
-            lock (m_lock) {
-                if (m_state == PromiseState.Unresolved)
-                    m_cancelHandlers.AddLast(handler);
-                else if (m_state == PromiseState.Cancelled)
-                    handler();
-            }
+            AddHandler(null, null, handler);
             return this;
         }
 
-        public void HandleCancelled(Action handler) {
-            Cancelled(handler);
+        public Promise<T> Finally(Action handler) {
+            if (handler == null)
+                throw new ArgumentNullException("handler");
+            AddHandler(
+                x => handler(),
+                e => handler(),
+                handler
+            );
+            return this;
         }
 
         /// <summary>
@@ -415,15 +470,15 @@
             if (!evt.WaitOne(timeout, true))
                 throw new TimeoutException();
 
-            switch (State) {
-                case PromiseState.Resolved:
+            switch (m_state) {
+                case ResolvedState:
                     return m_result;
-                case PromiseState.Cancelled:
+                case CancelledState:
                     throw new OperationCanceledException();
-                case PromiseState.Rejected:
+                case RejectedState:
                     throw new TargetInvocationException(m_error);
                 default:
-                    throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+                    throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
             }
         }
 
@@ -431,40 +486,45 @@
             return Join(Timeout.Infinite);
         }
 
-        void AddHandler(ResultHandlerInfo handler) {
-            bool invokeRequired = false;
+        void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
+            Interlocked.Increment(ref m_childrenCount);
+
+            HandlerDescriptor handler = new HandlerDescriptor {
+                resultHandler = success,
+                errorHandler = error,
+                cancellHandler = cancel
+            };
 
-            lock (m_lock) {
-                m_childrenCount++;
-                if (m_state == PromiseState.Unresolved) {
-                    m_resultHandlers.AddLast(handler);
-                } else
-                    invokeRequired = true;
+            bool queued;
+
+            if (!IsResolved) {
+                m_handlers.Enqueue(handler);
+                queued = true;
+            } else {
+                // the promise is in resolved state, just invoke the handled with minimum overhead
+                queued = false;
+                InvokeHandler(handler);
             }
 
-            // обработчики не должны блокировать сам объект
-            if (invokeRequired)
+            if (queued && IsResolved && m_handlers.TryDequeue(out handler))
+                // if the promise have been resolved while we was adding handler to the queue
+                // we can't guarantee that someone is still processing it
+                // therefore we will fetch a handler from the queue and execute it
+                // note that fetched handler may be not the one we have added
                 InvokeHandler(handler);
+
         }
 
-        void InvokeHandler(ResultHandlerInfo handler) {
+        void InvokeHandler(HandlerDescriptor handler) {
             switch (m_state) {
-                case PromiseState.Resolved:
-                    try {
-                        if (handler.resultHandler != null)
-                            handler.resultHandler(m_result);
-                    } catch (Exception e) {
-                        try {
-                            if (handler.errorHandler != null)
-                                handler.errorHandler(e);
-                        } catch { }
-                    }
+                case ResolvedState:
+                    handler.Resolve(m_result);
                     break;
-                case PromiseState.Rejected:
-                    try {
-                        if (handler.errorHandler != null)
-                            handler.errorHandler(m_error);
-                    } catch { }
+                case RejectedState:
+                    handler.Reject(m_error);
+                    break;
+                case CancelledState:
+                    handler.Cancel();
                     break;
                 default:
                     // do nothing
@@ -473,76 +533,31 @@
         }
 
         protected virtual void OnStateChanged() {
-            switch (m_state) {
-                case PromiseState.Resolved:
-                    foreach (var resultHandlerInfo in m_resultHandlers)
-                        try {
-                            if (resultHandlerInfo.resultHandler != null)
-                                resultHandlerInfo.resultHandler(m_result);
-                        } catch (Exception e) {
-                            try {
-                                if (resultHandlerInfo.errorHandler != null)
-                                    resultHandlerInfo.errorHandler(e);
-                            } catch { }
-                        }
-                    break;
-                case PromiseState.Cancelled:
-                    foreach (var cancelHandler in m_cancelHandlers)
-                        cancelHandler();
-                    break;
-                case PromiseState.Rejected:
-                    foreach (var resultHandlerInfo in m_resultHandlers)
-                        try {
-                            if (resultHandlerInfo.errorHandler != null)
-                                resultHandlerInfo.errorHandler(m_error);
-                        } catch { }
-                    break;
-                default:
-                    throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
-            }
-
-            m_resultHandlers = null;
-            m_cancelHandlers = null;
+            HandlerDescriptor handler;
+            while (m_handlers.TryDequeue(out handler))
+                InvokeHandler(handler);
         }
 
 
 
         public bool IsExclusive {
             get {
-                lock (m_lock) {
-                    return m_childrenCount <= 1;
-                }
-            }
-        }
-
-        public PromiseState State {
-            get {
-                lock (m_lock) {
-                    return m_state;
-                }
+                return m_childrenCount <= 1;
             }
         }
 
         protected bool Cancel(bool dependencies) {
-            bool result;
-
-            lock (m_lock) {
-                if (m_state == PromiseState.Unresolved) {
-                    m_state = PromiseState.Cancelled;
-                    result = true;
-                } else {
-                    result = false;
-                }
-            }
-
-            if (result)
+            if (BeginTransit()) {
+                CompleteTransit(CancelledState);
                 OnStateChanged();
 
-            if (dependencies && m_parent != null && m_parent.IsExclusive) {
-                m_parent.Cancel();
+                if (dependencies && m_parent != null && m_parent.IsExclusive)
+                    m_parent.Cancel();
+
+                return true;
+            } else {
+                return false;
             }
-
-            return result;
         }
 
     }