changeset 18:0c924dff5498

Слияние с promises
author cin
date Fri, 08 Nov 2013 01:27:04 +0400 (2013-11-07)
parents dfa21d507bc5 (current diff) 7cd4a843b4e4 (diff)
children f0568ff069a5
files Implab/AsyncPool.cs
diffstat 24 files changed, 1538 insertions(+), 233 deletions(-) [+]
line wrap: on
line diff
--- a/.hgignore	Mon Oct 21 02:34:31 2013 +0400
+++ b/.hgignore	Fri Nov 08 01:27:04 2013 +0400
@@ -10,3 +10,4 @@
 Implab.Fx/bin/
 Implab.Fx.Test/bin/
 Implab.Fx.Test/obj/
+_ReSharper.Implab/
--- a/Implab.Test/AsyncTests.cs	Mon Oct 21 02:34:31 2013 +0400
+++ b/Implab.Test/AsyncTests.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -1,101 +1,333 @@
 using System;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Implab;
 using System.Reflection;
 using System.Threading;
+using Implab.Parallels;
 
-namespace Implab.Tests
-{
-	[TestClass]
-	public class AsyncTests
-	{
-		[TestMethod]
-		public void ResolveTest ()
-		{
-			int res = -1;
-			var p = new Promise<int> ();
-			p.Then (x => res = x);
-			p.Resolve (100);
+namespace Implab.Test {
+    [TestClass]
+    public class AsyncTests {
+        [TestMethod]
+        public void ResolveTest() {
+            int res = -1;
+            var p = new Promise<int>();
+            p.Then(x => res = x);
+            p.Resolve(100);
+
+            Assert.AreEqual(res, 100);
+        }
+
+        [TestMethod]
+        public void RejectTest() {
+            int res = -1;
+            Exception err = null;
+
+            var p = new Promise<int>();
+            p.Then(x => res = x, e => err = e);
+            p.Reject(new ApplicationException("error"));
+
+            Assert.AreEqual(res, -1);
+            Assert.AreEqual(err.Message, "error");
+
+        }
+
+        [TestMethod]
+        public void JoinSuccessTest() {
+            var p = new Promise<int>();
+            p.Resolve(100);
+            Assert.AreEqual(p.Join(), 100);
+        }
 
-			Assert.AreEqual (res, 100);
-		}
+        [TestMethod]
+        public void JoinFailTest() {
+            var p = new Promise<int>();
+            p.Reject(new ApplicationException("failed"));
+
+            try {
+                p.Join();
+                throw new ApplicationException("WRONG!");
+            } catch (TargetInvocationException err) {
+                Assert.AreEqual(err.InnerException.Message, "failed");
+            } catch {
+                Assert.Fail("Got wrong excaption");
+            }
+        }
+
+        [TestMethod]
+        public void MapTest() {
+            var p = new Promise<int>();
+
+            var p2 = p.Map(x => x.ToString());
+            p.Resolve(100);
+
+            Assert.AreEqual(p2.Join(), "100");
+        }
+
+        [TestMethod]
+        public void FixErrorTest() {
+            var p = new Promise<int>();
+
+            var p2 = p.Error(e => 101);
+
+            p.Reject(new Exception());
+
+            Assert.AreEqual(p2.Join(), 101);
+        }
 
         [TestMethod]
-		public void RejectTest ()
-		{
-			int res = -1;
-			Exception err = null;
+        public void ChainTest() {
+            var p1 = new Promise<int>();
+
+            var p3 = p1.Chain(x => {
+                var p2 = new Promise<string>();
+                p2.Resolve(x.ToString());
+                return p2;
+            });
+
+            p1.Resolve(100);
 
-			var p = new Promise<int> ();
-			p.Then (x => res = x, e => err = e);
-			p.Reject (new ApplicationException ("error"));
+            Assert.AreEqual(p3.Join(), "100");
+        }
 
-			Assert.AreEqual (res, -1);
-			Assert.AreEqual (err.Message, "error");
+        [TestMethod]
+        public void PoolTest() {
+            var pid = Thread.CurrentThread.ManagedThreadId;
+            var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
 
-		}
+            Assert.AreNotEqual(pid, p.Join());
+        }
 
         [TestMethod]
-		public void JoinSuccessTest ()
-		{
-			var p = new Promise<int> ();
-			p.Resolve (100);
-			Assert.AreEqual (p.Join (), 100);
-		}
+        public void WorkerPoolSizeTest() {
+            var pool = new WorkerPool(5, 10, 0);
+
+            Assert.AreEqual(5, pool.ThreadCount);
+
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+
+            Assert.AreEqual(5, pool.ThreadCount);
+
+            for (int i = 0; i < 100; i++)
+                pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+            Thread.Sleep(100);
+            Assert.AreEqual(10, pool.ThreadCount);
+
+            pool.Dispose();
+        }
+
+        [TestMethod]
+        public void WorkerPoolCorrectTest() {
+            var pool = new WorkerPool(0,1000,100);
+
+            int iterations = 1000;
+            int pending = iterations;
+            var stop = new ManualResetEvent(false);
+
+            var count = 0;
+            for (int i = 0; i < iterations; i++) {
+                pool
+                    .Invoke(() => 1)
+                    .Then(x => Interlocked.Add(ref count, x))
+                    .Then(x => Math.Log10(x))
+                    .Anyway(() => {
+                        Interlocked.Decrement(ref pending);
+                        if (pending == 0)
+                            stop.Set();
+                    });
+            }
+
+            stop.WaitOne();
+
+            Assert.AreEqual(iterations, count);
+            Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
+            pool.Dispose();
+            
+        }
+
+        [TestMethod]
+        public void WorkerPoolDisposeTest() {
+            var pool = new WorkerPool(5, 20);
+            Assert.AreEqual(5, pool.ThreadCount);
+            pool.Dispose();
+            Thread.Sleep(100);
+            Assert.AreEqual(0, pool.ThreadCount);
+            pool.Dispose();
+        }
 
         [TestMethod]
-		public void JoinFailTest ()
-		{
-			var p = new Promise<int> ();
-			p.Reject (new ApplicationException ("failed"));
+        public void MTQueueTest() {
+            var queue = new MTQueue<int>();
+            int res;
+
+            queue.Enqueue(10);
+            Assert.IsTrue(queue.TryDequeue(out res));
+            Assert.AreEqual(10, res);
+            Assert.IsFalse(queue.TryDequeue(out res));
+
+            for (int i = 0; i < 1000; i++)
+                queue.Enqueue(i);
+
+            for (int i = 0; i < 1000; i++) {
+                queue.TryDequeue(out res);
+                Assert.AreEqual(i, res);
+            }
+
+            int writers = 0;
+            int readers = 0;
+            var stop = new ManualResetEvent(false);
+            int total = 0;
+
+            int itemsPerWriter = 1000;
+            int writersCount = 3;
 
-			try {
-				p.Join ();
-				throw new ApplicationException ("WRONG!");
-			} catch (TargetInvocationException err) {
-				Assert.AreEqual (err.InnerException.Message, "failed");
-			} catch {
-				Assert.Fail ("Got wrong excaption");
-			}
-		}
+            for (int i = 0; i < writersCount; i++) {
+                Interlocked.Increment(ref writers);
+                var wn = i;
+                AsyncPool
+                    .InvokeNewThread(() => {
+                        for (int ii = 0; ii < itemsPerWriter; ii++) {
+                            queue.Enqueue(1);
+                        }
+                        return 1;
+                    })
+                    .Anyway(() => Interlocked.Decrement(ref writers));
+            }
+
+            for (int i = 0; i < 10; i++) {
+                Interlocked.Increment(ref readers);
+                var wn = i;
+                AsyncPool
+                    .InvokeNewThread(() => {
+                        int t;
+                        do {
+                            while (queue.TryDequeue(out t))
+                                Interlocked.Add(ref total, t);
+                        } while (writers > 0);
+                        return 1;
+                    })
+                    .Anyway(() => {
+                        Interlocked.Decrement(ref readers);
+                        if (readers == 0)
+                            stop.Set();
+                    });
+            }
+
+            stop.WaitOne();
+
+            Assert.AreEqual(itemsPerWriter * writersCount, total);
+        }
 
         [TestMethod]
-		public void MapTest ()
-		{
-			var p = new Promise<int> ();
+        public void ParallelMapTest() {
+
+            int count = 100000;
+
+            double[] args = new double[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = rand.NextDouble();
 
-			var p2 = p.Map (x => x.ToString ());
-			p.Resolve (100);
+            var t = Environment.TickCount;
+            var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
+
+            Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
 
-			Assert.AreEqual (p2.Join (), "100");
-		}
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
 
         [TestMethod]
-		public void ChainTest ()
-		{
-			var p1 = new Promise<int> ();
+        public void ChainedMapTest() {
+
+            using (var pool = new WorkerPool(8,100,0)) {
+                int count = 10000;
+
+                double[] args = new double[count];
+                var rand = new Random();
+
+                for (int i = 0; i < count; i++)
+                    args[i] = rand.NextDouble();
 
-			var p3 = p1.Chain (x => {
-				var p2 = new Promise<string> ();
-				p2.Resolve (x.ToString ());
-				return p2;
-			});
+                var t = Environment.TickCount;
+                var res = args
+                    .ChainedMap(
+                        x => pool.Invoke(
+                            () => Math.Sin(x * x)
+                        ),
+                        4
+                    )
+                    .Join();
 
-			p1.Resolve (100);
+                Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
 
-			Assert.AreEqual (p3.Join (), "100");
-		}
+                t = Environment.TickCount;
+                for (int i = 0; i < count; i++)
+                    Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
+                Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+                Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads);
+            }
+        }
 
         [TestMethod]
-		public void PoolTest ()
-		{
-			var pid = Thread.CurrentThread.ManagedThreadId;
-			var p = AsyncPool.Invoke (() => {
-				return Thread.CurrentThread.ManagedThreadId;
-			});
+        public void ParallelForEachTest() {
+
+            int count = 100000;
+
+            int[] args = new int[count];
+            var rand = new Random();
+
+            for (int i = 0; i < count; i++)
+                args[i] = (int)(rand.NextDouble() * 100);
+
+            int result = 0;
+
+            var t = Environment.TickCount;
+            args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
+
+            Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
+
+            int result2 = 0;
+
+            t = Environment.TickCount;
+            for (int i = 0; i < count; i++)
+                result2 += args[i];
+            Assert.AreEqual(result2, result);
+            Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
+        }
 
-			Assert.AreNotEqual (pid, p.Join ());
-		}
-	}
+        [TestMethod]
+        public void ComplexCase1Test() {
+            var flags = new bool[3];
+
+            // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
+
+            var p = PromiseHelper
+                .Sleep(200, "Alan")
+                .Cancelled(() => flags[0] = true)
+                .Chain(x =>
+                    PromiseHelper
+                        .Sleep(200, "Hi, " + x)
+                        .Map(y => y)
+                        .Cancelled(() => flags[1] = true)
+                )
+                .Cancelled(() => flags[2] = true);
+            Thread.Sleep(300);
+            p.Cancel();
+            try {
+                Assert.AreEqual(p.Join(), "Hi, Alan");
+                Assert.Fail("Shouldn't get here");
+            } catch (OperationCanceledException) {
+            }
+
+            Assert.IsFalse(flags[0]);
+            Assert.IsTrue(flags[1]);
+            Assert.IsTrue(flags[2]);
+        }
+    }
 }
 
--- a/Implab.Test/Implab.Test.csproj	Mon Oct 21 02:34:31 2013 +0400
+++ b/Implab.Test/Implab.Test.csproj	Fri Nov 08 01:27:04 2013 +0400
@@ -46,6 +46,7 @@
   </ItemGroup>
   <ItemGroup>
     <Compile Include="AsyncTests.cs" />
+    <Compile Include="PromiseHelper.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
   </ItemGroup>
   <ItemGroup>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab.Test/PromiseHelper.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,17 @@
+using Implab.Parallels;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Test {
+    class PromiseHelper {
+        public static Promise<T> Sleep<T>(int timeout, T retVal) {
+            return AsyncPool.Invoke(() => {
+                Thread.Sleep(timeout);
+                return retVal;
+            });
+        }
+    }
+}
Binary file Implab.suo has changed
Binary file Implab.v11.suo has changed
--- a/Implab/AsyncPool.cs	Mon Oct 21 02:34:31 2013 +0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,28 +0,0 @@
-using System;
-using System.Threading;
-
-namespace Implab {
-	/// <summary>
-	/// Класс для распаралеливания задач.
-	/// </summary>
-	/// <remarks>
-	/// Используя данный класс и лямда выражения можно распараллелить
-	/// вычисления, для этого используется концепция обещаний.
-	/// </remarks>
-	public static class AsyncPool {
-
-		public static Promise<T> Invoke<T>(Func<T> func) {
-			var p = new Promise<T>();
-
-			ThreadPool.QueueUserWorkItem(param => {
-				try {
-					p.Resolve(func());
-				} catch(Exception e) {
-					p.Reject(e);
-				}
-			});
-
-			return p;
-		}
-	}
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ICancellable.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,10 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+    public interface ICancellable {
+        bool Cancel();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/IProgressHandler.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+    public interface IProgressHandler {
+        string Message {
+            get;
+            set;
+        }
+        float CurrentProgress {
+            get;
+            set;
+        }
+        void InitProgress(float current, float max, string message);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/IProgressNotifier.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab
+{
+    public interface IProgressNotifier
+    {
+        event EventHandler<ValueEventArgs<string>> MessageUpdated;
+        event EventHandler<ValueEventArgs<float>> ProgressUpdated;
+        event EventHandler<ProgressInitEventArgs> ProgressInit;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/IPromise.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,33 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab
+{
+    public interface IPromise: ICancellable
+    {
+        /// <summary>
+        /// Check whereather the promise has no more than one dependent promise.
+        /// </summary>
+        bool IsExclusive
+        {
+            get;
+        }
+
+        /// <summary>
+        /// The current state of the promise.
+        /// </summary>
+        PromiseState State
+        {
+            get;
+        }
+
+        /// <summary>
+        /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
+        /// handler will be invoked immediatelly.
+        /// </summary>
+        /// <param name="handler">The handler</param>
+        void HandleCancelled(Action handler);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ITaskController.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+    public interface ITaskController: IProgressHandler {
+        bool Cancelled {
+            get;
+        }
+    }
+}
--- a/Implab/Implab.csproj	Mon Oct 21 02:34:31 2013 +0400
+++ b/Implab/Implab.csproj	Fri Nov 08 01:27:04 2013 +0400
@@ -32,13 +32,25 @@
     <Reference Include="System" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="ICancellable.cs" />
+    <Compile Include="IProgressHandler.cs" />
+    <Compile Include="IProgressNotifier.cs" />
+    <Compile Include="IPromise.cs" />
+    <Compile Include="ITaskController.cs" />
+    <Compile Include="ManagedPromise.cs" />
+    <Compile Include="Parallels\DispatchPool.cs" />
+    <Compile Include="Parallels\ArrayTraits.cs" />
+    <Compile Include="Parallels\MTQueue.cs" />
+    <Compile Include="Parallels\WorkerPool.cs" />
+    <Compile Include="PromiseState.cs" />
+    <Compile Include="TaskController.cs" />
+    <Compile Include="ProgressInitEventArgs.cs" />
     <Compile Include="Properties\AssemblyInfo.cs" />
     <Compile Include="Promise.cs" />
-    <Compile Include="AsyncPool.cs" />
+    <Compile Include="Parallels\AsyncPool.cs" />
     <Compile Include="Safe.cs" />
+    <Compile Include="ValueEventArgs.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
-  <ItemGroup>
-    <Folder Include="Parallels\" />
-  </ItemGroup>
+  <ItemGroup />
 </Project>
\ No newline at end of file
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ManagedPromise.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+
+    /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
+        
+    }*/
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/ArrayTraits.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,171 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Parallels {
+    public static class ArrayTraits {
+        class ArrayIterator<TSrc> : DispatchPool<int> {
+            readonly Action<TSrc> m_action;
+            readonly TSrc[] m_source;
+            readonly Promise<int> m_promise = new Promise<int>();
+
+            int m_pending;
+            int m_next;
+
+            public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
+                : base(threads) {
+
+                Debug.Assert(source != null);
+                Debug.Assert(action != null);
+
+                m_next = 0;
+                m_source = source;
+                m_pending = source.Length;
+                m_action = action;
+
+                m_promise.Anyway(() => Dispose());
+                m_promise.Cancelled(() => Dispose());
+
+                InitPool();
+            }
+
+            public Promise<int> Promise {
+                get {
+                    return m_promise;
+                }
+            }
+
+            protected override bool TryDequeue(out int unit) {
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
+            }
+
+            protected override void InvokeUnit(int unit) {
+                try {
+                    m_action(m_source[unit]);
+                    var pending = Interlocked.Decrement(ref m_pending);
+                    if (pending == 0)
+                        m_promise.Resolve(m_source.Length);
+                } catch (Exception e) {
+                    m_promise.Reject(e);
+                }
+            }
+        }
+
+        class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
+            readonly Func<TSrc, TDst> m_transform;
+            readonly TSrc[] m_source;
+            readonly TDst[] m_dest;
+            readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
+
+            int m_pending;
+            int m_next;
+
+            public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
+                : base(threads) {
+
+                Debug.Assert (source != null);
+                Debug.Assert( transform != null);
+
+                m_next = 0;
+                m_source = source;
+                m_dest = new TDst[source.Length];
+                m_pending = source.Length;
+                m_transform = transform;
+
+                m_promise.Anyway(() => Dispose());
+                m_promise.Cancelled(() => Dispose());
+
+                InitPool();
+            }
+
+            public Promise<TDst[]> Promise {
+                get {
+                    return m_promise;
+                }
+            }
+
+            protected override bool TryDequeue(out int unit) {
+                unit = Interlocked.Increment(ref m_next) - 1;
+                return unit >= m_source.Length ? false : true;
+            }
+
+            protected override void InvokeUnit(int unit) {
+                try {
+                    m_dest[unit] = m_transform(m_source[unit]);
+                    var pending = Interlocked.Decrement(ref m_pending);
+                    if (pending == 0)
+                        m_promise.Resolve(m_dest);
+                } catch (Exception e) {
+                    m_promise.Reject(e);
+                }
+            }
+        }
+
+        public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+
+            var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
+            return mapper.Promise;
+        }
+
+        public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (action == null)
+                throw new ArgumentNullException("action");
+
+            var iter = new ArrayIterator<TSrc>(source, action, threads);
+            return iter.Promise;
+        }
+
+        public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (transform == null)
+                throw new ArgumentNullException("transform");
+            if (threads <= 0)
+                throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
+
+            var promise = new Promise<TDst[]>();
+            var res = new TDst[source.Length];
+            var pending = source.Length;
+            var semaphore = new Semaphore(threads, threads);
+
+            AsyncPool.InvokeNewThread(() => {
+                for (int i = 0; i < source.Length; i++) {
+                    if(promise.State != PromiseState.Unresolved)
+                        break; // stop processing in case of error or cancellation
+                    var idx = i;
+                    semaphore.WaitOne();
+                    try {
+                        var p1 = transform(source[i]);
+                        p1.Anyway(() => semaphore.Release());
+                        p1.Cancelled(() => semaphore.Release());
+                        p1.Then(
+                            x => {
+                                res[idx] = x;
+                                var left = Interlocked.Decrement(ref pending);
+                                if (left == 0)
+                                    promise.Resolve(res);
+                            },
+                            e => promise.Reject(e)
+                        );
+
+                    } catch (Exception e) {
+                        promise.Reject(e);
+                    }
+                }
+                return 0;
+            });
+
+            return promise.Anyway(() => semaphore.Dispose());
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/AsyncPool.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,44 @@
+using System;
+using System.Threading;
+
+namespace Implab.Parallels {
+	/// <summary>
+	/// Класс для распаралеливания задач.
+	/// </summary>
+	/// <remarks>
+	/// Используя данный класс и лямда выражения можно распараллелить
+	/// вычисления, для этого используется концепция обещаний.
+	/// </remarks>
+	public static class AsyncPool {
+
+		public static Promise<T> Invoke<T>(Func<T> func) {
+			var p = new Promise<T>();
+
+			ThreadPool.QueueUserWorkItem(param => {
+				try {
+					p.Resolve(func());
+				} catch(Exception e) {
+					p.Reject(e);
+				}
+			});
+
+			return p;
+		}
+
+        public static Promise<T> InvokeNewThread<T>(Func<T> func) {
+            var p = new Promise<T>();
+
+            var worker = new Thread(() => {
+                try {
+                    p.Resolve(func());
+                } catch (Exception e) {
+                    p.Reject(e);
+                }
+            });
+            worker.IsBackground = true;
+            worker.Start();
+
+            return p;
+        }
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/DispatchPool.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,238 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Diagnostics;
+
+namespace Implab.Parallels {
+    public abstract class DispatchPool<TUnit> : IDisposable {
+        readonly int m_minThreads;
+        readonly int m_maxThreads;
+        int m_runningThreads = 0;
+        int m_maxRunningThreads = 0;
+        int m_suspended = 0;
+        int m_exitRequired = 0;
+        AutoResetEvent m_hasTasks = new AutoResetEvent(false);
+
+        protected DispatchPool(int min, int max) {
+            if (min < 0)
+                throw new ArgumentOutOfRangeException("min");
+            if (max <= 0)
+                throw new ArgumentOutOfRangeException("max");
+
+            if (min > max)
+                min = max;
+            m_minThreads = min;
+            m_maxThreads = max;
+        }
+
+        protected DispatchPool(int threads)
+            : this(threads, threads) {
+        }
+
+        protected DispatchPool() {
+            int maxThreads, maxCP;
+            ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
+
+            m_minThreads = 0;
+            m_maxThreads = maxThreads;
+        }
+
+        protected void InitPool() {
+            for (int i = 0; i < m_minThreads; i++)
+                StartWorker();
+        }
+
+        public int ThreadCount {
+            get {
+                return m_runningThreads;
+            }
+        }
+
+        public int MaxRunningThreads {
+            get {
+                return m_maxRunningThreads;
+            }
+        }
+
+        protected bool IsDisposed {
+            get {
+                return m_exitRequired != 0;
+            }
+        }
+
+        protected abstract bool TryDequeue(out TUnit unit);
+
+        protected virtual bool ExtendPool() {
+            if (m_suspended > 0) {
+                m_hasTasks.Set();
+                return true;
+            } else
+                return StartWorker();
+        }
+
+        /// <summary>
+        /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
+        /// </summary>
+        protected void WakePool() {
+            m_hasTasks.Set(); // wake sleeping thread;
+
+            if (AllocateThreadSlot(1)) {
+                var worker = new Thread(this.Worker);
+                worker.IsBackground = true;
+                worker.Start();
+            }
+        }
+
+        protected virtual void Suspend() {
+            m_hasTasks.WaitOne();
+        }
+
+        #region thread slots traits
+
+        bool AllocateThreadSlot() {
+            int current;
+            // use spins to allocate slot for the new thread
+            do {
+                current = m_runningThreads;
+                if (current >= m_maxThreads || m_exitRequired != 0)
+                    // no more slots left or the pool has been disposed
+                    return false;
+            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
+
+            UpdateMaxThreads(current + 1);
+
+            return true;
+        }
+
+        bool AllocateThreadSlot(int desired) {
+            if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
+                return false;
+
+            UpdateMaxThreads(desired);
+
+            return true;
+        }
+
+        bool ReleaseThreadSlot(out bool last) {
+            last = false;
+            int current;
+            // use spins to release slot for the new thread
+            do {
+                current = m_runningThreads;
+                if (current <= m_minThreads && m_exitRequired == 0)
+                    // the thread is reserved
+                    return false;
+            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
+
+            last = (current == 1);
+
+            return true;
+        }
+
+        /// <summary>
+        /// releases thread slot unconditionally, used during cleanup
+        /// </summary>
+        /// <returns>true - no more threads left</returns>
+        bool ReleaseThreadSlotAnyway() {
+            var left = Interlocked.Decrement(ref m_runningThreads);
+            return left == 0;
+        }
+
+        void UpdateMaxThreads(int count) {
+            int max;
+            do {
+                max = m_maxRunningThreads;
+                if (max >= count)
+                    break;
+            } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
+        }
+
+        #endregion
+
+        bool StartWorker() {
+            if (AllocateThreadSlot()) {
+                // slot successfully allocated
+                var worker = new Thread(this.Worker);
+                worker.IsBackground = true;
+                worker.Start();
+
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        bool FetchTask(out TUnit unit) {
+            do {
+                // exit if requested
+                if (m_exitRequired != 0) {
+                    // release the thread slot
+                    if (ReleaseThreadSlotAnyway()) // it was the last worker
+                        m_hasTasks.Dispose();
+                    else
+                        m_hasTasks.Set(); // wake next worker
+                    unit = default(TUnit);
+                    return false;
+                }
+
+                // fetch task
+                if (TryDequeue(out unit)) {
+                    ExtendPool();
+                    return true;
+                }
+
+                //no tasks left, exit if the thread is no longer needed
+                bool last;
+                if (ReleaseThreadSlot(out last)) {
+                    if (last && m_hasTasks.WaitOne(0)) {
+                        if (AllocateThreadSlot(1))
+                            continue; // spin again...
+                        else
+                            // we failed to reallocate slot for this thread
+                            // therefore we need to release the event
+                            m_hasTasks.Set(); 
+                    }
+
+                    return false;
+                }
+
+                // entering suspend state
+                Interlocked.Increment(ref m_suspended);
+                // keep this thread and wait                
+                Suspend();
+                Interlocked.Decrement(ref m_suspended);
+            } while (true);
+        }
+
+        protected abstract void InvokeUnit(TUnit unit);
+
+        void Worker() {
+            TUnit unit;
+            while (FetchTask(out unit))
+                InvokeUnit(unit);
+        }
+
+        protected virtual void Dispose(bool disposing) {
+            if (disposing) {
+                if (m_exitRequired == 0) {
+                    if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
+                        return;
+
+                    // wake sleeping threads
+                    m_hasTasks.Set();
+                    GC.SuppressFinalize(this);
+                }
+            }
+        }
+
+        public void Dispose() {
+            Dispose(true);
+        }
+
+        ~DispatchPool() {
+            Dispose(false);
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/MTQueue.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,74 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Parallels {
+    public class MTQueue<T> {
+        class Node {
+            public Node(T value) {
+                this.value = value;
+            }
+            public readonly T value;
+            public Node next;
+        }
+
+        Node m_first;
+        Node m_last;
+
+        public void Enqueue(T value) {
+            var last = m_last;
+            var next = new Node(value);
+
+            while (last != Interlocked.CompareExchange(ref m_last, next, last))
+                last = m_last;
+
+            if (last != null)
+                last.next = next;
+            else
+                m_first = next;
+        }
+
+        public bool TryDequeue(out T value) {
+            Node first;
+            Node next = null;
+            value = default(T);
+
+            do {
+                first = m_first;
+                if (first == null)
+                    return false;
+                next = first.next;
+                if (next == null) {
+                    // this is the last element,
+                    // then try to update tail
+                    if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
+                        // this is inconsistent situation which means that the queue is empty
+                        if (m_last == null)
+                            return false;
+                        // tail has been changed, that means that we need to restart
+                        continue; 
+                    }
+
+                    // tail succesfully updated and first.next will never be changed
+                    // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null
+                    // but the writer may update the m_first since the m_last is null
+
+                    // so we need to fix inconsistency by setting m_first to null, but if it already has been
+                    // updated by a writer then we should just give up
+                    Interlocked.CompareExchange(ref m_first, null, first);
+                    break;
+
+                } else {
+                        if (first == Interlocked.CompareExchange(ref m_first, next, first))
+                            // head succesfully updated
+                            break;
+                }
+            } while (true);
+
+            value = first.value;
+            return true;
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/WorkerPool.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,89 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Diagnostics;
+
+namespace Implab.Parallels {
+    public class WorkerPool : DispatchPool<Action> {
+
+        MTQueue<Action> m_queue = new MTQueue<Action>();
+        int m_queueLength = 0;
+        readonly int m_threshold = 1;
+
+        public WorkerPool(int minThreads, int maxThreads, int threshold)
+            : base(minThreads, maxThreads) {
+            m_threshold = threshold;
+            InitPool();
+        }
+
+        public WorkerPool(int minThreads, int maxThreads) :
+            base(minThreads, maxThreads) {
+            InitPool();
+        }
+
+        public WorkerPool(int threads)
+            : base(threads) {
+            InitPool();
+        }
+
+        public WorkerPool()
+            : base() {
+            InitPool();
+        }
+
+        public Promise<T> Invoke<T>(Func<T> task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new Promise<T>();
+
+            EnqueueTask(delegate() {
+                try {
+                    promise.Resolve(task());
+                } catch (Exception e) {
+                    promise.Reject(e);
+                }
+            });
+
+            return promise;
+        }
+
+        protected void EnqueueTask(Action unit) {
+            Debug.Assert(unit != null);
+            var len = Interlocked.Increment(ref m_queueLength);
+            m_queue.Enqueue(unit);
+
+            if(!ExtendPool())
+                WakePool();
+        }
+
+        protected override bool ExtendPool() {
+            if (m_queueLength <= m_threshold*ThreadCount)
+                // in this case we are in active thread and it request for additional workers
+                // satisfy it only when queue is longer than threshold
+                return false;
+            return base.ExtendPool();
+        }
+
+        protected override bool TryDequeue(out Action unit) {
+            if (m_queue.TryDequeue(out unit)) {
+                Interlocked.Decrement(ref m_queueLength);
+                return true;
+            }
+            return false;
+        }
+
+        protected override void InvokeUnit(Action unit) {
+            unit();
+        }
+
+        protected override void Suspend() {
+            if (m_queueLength == 0)
+                base.Suspend();
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ProgressInitEventArgs.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,36 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab
+{
+    [Serializable]
+    public class ProgressInitEventArgs: EventArgs
+    {
+        public float MaxProgress
+        {
+            get;
+            private set;
+        }
+
+        public float CurrentProgress
+        {
+            get;
+            private set;
+        }
+
+        public string Message
+        {
+            get;
+            private set;
+        }
+
+        public ProgressInitEventArgs(float current, float max, string message)
+        {
+            this.MaxProgress = max;
+            this.CurrentProgress = current;
+            this.Message = message;
+        }
+    }
+}
--- a/Implab/Promise.cs	Mon Oct 21 02:34:31 2013 +0400
+++ b/Implab/Promise.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -1,18 +1,16 @@
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Reflection;
-using System.Text;
 using System.Diagnostics;
 using System.Threading;
 
 namespace Implab {
 
     public delegate void ErrorHandler(Exception e);
-
-    public delegate void ResultHandler<T>(T result);
-    public delegate TNew ResultMapper<TSrc, TNew>(TSrc result);
-    public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result);
+    public delegate T ErrorHandler<out T>(Exception e);
+    public delegate void ResultHandler<in T>(T result);
+    public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
+    public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
 
     /// <summary>
     /// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -48,23 +46,23 @@
     /// только инициатор обещания иначе могут возникнуть противоречия.
     /// </para>
     /// </remarks>
-    public class Promise<T> {
+    public class Promise<T> : IPromise {
 
         struct ResultHandlerInfo {
             public ResultHandler<T> resultHandler;
             public ErrorHandler errorHandler;
         }
 
-        enum State {
-            Unresolved,
-            Resolving,
-            Resolved,
-            Cancelled
-        }
+        readonly IPromise m_parent;
+
+        LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
+        LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
 
-        LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>();
-        State m_state;
-        bool m_cancellable;
+        readonly object m_lock = new Object();
+        readonly bool m_cancellable;
+        int m_childrenCount = 0;
+
+        PromiseState m_state;
         T m_result;
         Exception m_error;
 
@@ -72,13 +70,17 @@
             m_cancellable = true;
         }
 
-        /// <summary>
-        /// Событие, возникающее при отмене асинхронной операции.
-        /// </summary>
-        /// <description>
-        /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
-        /// </description>
-        public event EventHandler Cancelled;
+        public Promise(IPromise parent, bool cancellable) {
+            m_cancellable = cancellable;
+            m_parent = parent;
+            if (parent != null)
+                parent.HandleCancelled(InternalCancel);
+        }
+
+        void InternalCancel() {
+            // don't try to cancel parent :)
+            Cancel(false);
+        }
 
         /// <summary>
         /// Выполняет обещание, сообщая об успешном выполнении.
@@ -86,38 +88,39 @@
         /// <param name="result">Результат выполнения.</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Resolve(T result) {
-            lock (this) {
-                if (m_state == State.Cancelled)
+            lock (m_lock) {
+                if (m_state == PromiseState.Cancelled)
                     return;
-                if (m_state != State.Unresolved)
+                if (m_state != PromiseState.Unresolved)
                     throw new InvalidOperationException("The promise is already resolved");
                 m_result = result;
-                m_state = State.Resolving;
+                m_state = PromiseState.Resolved;
             }
 
-            ResultHandlerInfo handler;
-            while (FetchNextHandler(out handler))
-                InvokeHandler(handler);
+            OnStateChanged();
         }
 
         /// <summary>
         /// Выполняет обещание, сообщая об ошибке
         /// </summary>
+        /// <remarks>
+        /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
+        /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
+        /// будут проигнорированы.
+        /// </remarks>
         /// <param name="error">Исключение возникшее при выполнении операции</param>
         /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
         public void Reject(Exception error) {
-            lock (this) {
-                if (m_state == State.Cancelled)
+            lock (m_lock) {
+                if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
                     return;
-                if (m_state != State.Unresolved)
+                if (m_state != PromiseState.Unresolved)
                     throw new InvalidOperationException("The promise is already resolved");
                 m_error = error;
-                m_state = State.Resolving;
+                m_state = PromiseState.Rejected;
             }
 
-            ResultHandlerInfo handler;
-            while (FetchNextHandler(out handler))
-                InvokeHandler(handler);
+            OnStateChanged();
         }
 
         /// <summary>
@@ -125,47 +128,31 @@
         /// </summary>
         /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
         public bool Cancel() {
-            lock (this) {
-                if (m_state == State.Unresolved && m_cancellable) {
-                    m_state = State.Cancelled;
-                    EventHandler temp = Cancelled;
-
-                    if (temp != null)
-                        temp(this, new EventArgs());
-
-                    return true;
-                } else
-                    return false;
-            }
+            return Cancel(true);
         }
 
         /// <summary>
-        /// Добавляет обработчики событий выполнения обещания.
+        /// Adds new handlers to this promise.
         /// </summary>
-        /// <param name="success">Обработчик успешного выполнения обещания.
-        /// Данному обработчику будет передан результат выполнения операции.</param>
-        /// <param name="error">Обработчик ошибки. Данный обработчик получит
-        /// исключение возникшее при выполнении операции.</param>
-        /// <returns>Само обещание</returns>
+        /// <param name="success">The handler of the successfully completed operation.
+        /// This handler will recieve an operation result as a parameter.</param>
+        /// <param name="error">Handles an exception that may occur during the operation.</param>
+        /// <returns>The new promise chained to this one.</returns>
         public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
             if (success == null && error == null)
                 return this;
 
-            var medium = new Promise<T>();
+            var medium = new Promise<T>(this, true);
 
             var handlerInfo = new ResultHandlerInfo();
 
             if (success != null)
                 handlerInfo.resultHandler = x => {
-                    try {
-                        success(x);
-                        medium.Resolve(x);
-                    } catch (Exception e) {
-                        medium.Reject(e);
-                    }
+                    success(x);
+                    medium.Resolve(x);
                 };
             else
-                handlerInfo.resultHandler = x => medium.Resolve(x);
+                handlerInfo.resultHandler = medium.Resolve;
 
             if (error != null)
                 handlerInfo.errorHandler = x => {
@@ -175,21 +162,106 @@
                     medium.Reject(x);
                 };
             else
-                handlerInfo.errorHandler = x => medium.Reject(x);
+                handlerInfo.errorHandler = medium.Reject;
+
+            AddHandler(handlerInfo);
+
+            return medium;
+        }
+
+        /// <summary>
+        /// Adds new handlers to this promise.
+        /// </summary>
+        /// <param name="success">The handler of the successfully completed operation.
+        /// This handler will recieve an operation result as a parameter.</param>
+        /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
+        /// <returns>The new promise chained to this one.</returns>
+        public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
+            if (success == null && error == null)
+                return this;
+
+            var medium = new Promise<T>(this, true);
+
+            var handlerInfo = new ResultHandlerInfo();
+
+            if (success != null)
+                handlerInfo.resultHandler = x => {
+                    success(x);
+                    medium.Resolve(x);
+                };
+            else
+                handlerInfo.resultHandler = medium.Resolve;
+
+            if (error != null)
+                handlerInfo.errorHandler = x => {
+                    try {
+                        medium.Resolve(error(x));
+                    } catch { }
+                    medium.Reject(x);
+                };
+            else
+                handlerInfo.errorHandler = medium.Reject;
 
             AddHandler(handlerInfo);
 
             return medium;
         }
 
+
         public Promise<T> Then(ResultHandler<T> success) {
-            return Then(success, null);
+            if (success == null)
+                return this;
+
+            var medium = new Promise<T>(this, true);
+
+            var handlerInfo = new ResultHandlerInfo();
+
+            if (success != null)
+                handlerInfo.resultHandler = x => {
+                    success(x);
+                    medium.Resolve(x);
+                };
+            else
+                handlerInfo.resultHandler = medium.Resolve;
+
+            handlerInfo.errorHandler = medium.Reject;
+
+            AddHandler(handlerInfo);
+
+            return medium;
         }
 
         public Promise<T> Error(ErrorHandler error) {
             return Then(null, error);
         }
 
+        /// <summary>
+        /// Handles error and allows to keep the promise.
+        /// </summary>
+        /// <remarks>
+        /// If the specified handler throws an exception, this exception will be used to reject the promise.
+        /// </remarks>
+        /// <param name="handler">The error handler which returns the result of the promise.</param>
+        /// <returns>New promise.</returns>
+        public Promise<T> Error(ErrorHandler<T> handler) {
+            if (handler == null)
+                return this;
+
+            var medium = new Promise<T>(this, true);
+
+            AddHandler(new ResultHandlerInfo {
+                errorHandler = e => {
+                    try {
+                        medium.Resolve(handler(e));
+                    } catch (Exception e2) {
+                        medium.Reject(e2);
+                    }
+                }
+            });
+
+            return medium;
+        }
+
         public Promise<T> Anyway(Action handler) {
             if (handler == null)
                 return this;
@@ -198,6 +270,7 @@
 
             AddHandler(new ResultHandlerInfo {
                 resultHandler = x => {
+                    // to avoid handler being called multiple times we handle exception by ourselfs
                     try {
                         handler();
                         medium.Resolve(x);
@@ -229,20 +302,15 @@
                 throw new ArgumentNullException("mapper");
 
             // создаем прицепленное обещание
-            Promise<TNew> chained = new Promise<TNew>();
+            var chained = new Promise<TNew>();
 
             AddHandler(new ResultHandlerInfo() {
-                resultHandler = delegate(T result) {
-                    try {
-                        // если преобразование выдаст исключение, то сработает reject сцепленного deferred
-                        chained.Resolve(mapper(result));
-                    } catch (Exception e) {
-                        chained.Reject(e);
-                    }
-                },
+                resultHandler = result => chained.Resolve(mapper(result)),
                 errorHandler = delegate(Exception e) {
                     if (error != null)
-                        error(e);
+                        try {
+                            error(e);
+                        } catch { }
                     // в случае ошибки нужно передать исключение дальше по цепочке
                     chained.Reject(e);
                 }
@@ -271,19 +339,21 @@
             // создать посредника, к которому будут подвызяваться следующие обработчики.
             // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
             // передать через него результаты работы.
-            Promise<TNew> medium = new Promise<TNew>();
+            var medium = new Promise<TNew>(this, true);
 
-            AddHandler(new ResultHandlerInfo() {
+            AddHandler(new ResultHandlerInfo {
                 resultHandler = delegate(T result) {
-                    try {
-                        chained(result).Then(
-                            x => medium.Resolve(x),
-                            e => medium.Reject(e)
-                            );
-                    } catch (Exception e) {
-                        // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
-                        medium.Reject(e);
-                    }
+                    if (medium.State == PromiseState.Cancelled)
+                        return;
+
+                    var promise = chained(result);
+
+                    // notify chained operation that it's not needed
+                    medium.Cancelled(() => promise.Cancel());
+                    promise.Then(
+                        x => medium.Resolve(x),
+                        e => medium.Reject(e)
+                    );
                 },
                 errorHandler = delegate(Exception e) {
                     if (error != null)
@@ -300,6 +370,22 @@
             return Chain(chained, null);
         }
 
+        public Promise<T> Cancelled(Action handler) {
+            if (handler == null)
+                return this;
+            lock (m_lock) {
+                if (m_state == PromiseState.Unresolved)
+                    m_cancelHandlers.AddLast(handler);
+                else if (m_state == PromiseState.Cancelled)
+                    handler();
+            }
+            return this;
+        }
+
+        public void HandleCancelled(Action handler) {
+            Cancelled(handler);
+        }
+
         /// <summary>
         /// Дожидается отложенного обещания и в случае успеха, возвращает
         /// его, результат, в противном случае бросает исключение.
@@ -322,52 +408,37 @@
         /// <param name="timeout">Время ожидания</param>
         /// <returns>Результат выполнения обещания</returns>
         public T Join(int timeout) {
-            ManualResetEvent evt = new ManualResetEvent(false);
+            var evt = new ManualResetEvent(false);
             Anyway(() => evt.Set());
+            Cancelled(() => evt.Set());
 
             if (!evt.WaitOne(timeout, true))
                 throw new TimeoutException();
 
-            if (m_error != null)
-                throw new TargetInvocationException(m_error);
-            else
-                return m_result;
+            switch (State) {
+                case PromiseState.Resolved:
+                    return m_result;
+                case PromiseState.Cancelled:
+                    throw new OperationCanceledException();
+                case PromiseState.Rejected:
+                    throw new TargetInvocationException(m_error);
+                default:
+                    throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+            }
         }
 
         public T Join() {
             return Join(Timeout.Infinite);
         }
 
-        /// <summary>
-        /// Данный метод последовательно извлекает обработчики обещания и когда
-        /// их больше не осталось - ставит состояние "разрешено".
-        /// </summary>
-        /// <param name="handler">Информация об обработчике</param>
-        /// <returns>Признак того, что еще остались обработчики в очереди</returns>
-        bool FetchNextHandler(out ResultHandlerInfo handler) {
-            handler = default(ResultHandlerInfo);
-
-            lock (this) {
-                Debug.Assert(m_state == State.Resolving);
-
-                if (m_handlersChain.Count > 0) {
-                    handler = m_handlersChain.First.Value;
-                    m_handlersChain.RemoveFirst();
-                    return true;
-                } else {
-                    m_state = State.Resolved;
-                    return false;
-                }
-            }
-        }
-
         void AddHandler(ResultHandlerInfo handler) {
             bool invokeRequired = false;
 
-            lock (this) {
-                if (m_state != State.Resolved)
-                    m_handlersChain.AddLast(handler);
-                else
+            lock (m_lock) {
+                m_childrenCount++;
+                if (m_state == PromiseState.Unresolved) {
+                    m_resultHandlers.AddLast(handler);
+                } else
                     invokeRequired = true;
             }
 
@@ -377,21 +448,102 @@
         }
 
         void InvokeHandler(ResultHandlerInfo handler) {
-            if (m_error == null) {
-                try {
-                    if (handler.resultHandler != null)
-                        handler.resultHandler(m_result);
-                } catch { }
-            }
-
-            if (m_error != null) {
-                try {
-                    if (handler.errorHandler != null)
-                        handler.errorHandler(m_error);
-                } catch { }
+            switch (m_state) {
+                case PromiseState.Resolved:
+                    try {
+                        if (handler.resultHandler != null)
+                            handler.resultHandler(m_result);
+                    } catch (Exception e) {
+                        try {
+                            if (handler.errorHandler != null)
+                                handler.errorHandler(e);
+                        } catch { }
+                    }
+                    break;
+                case PromiseState.Rejected:
+                    try {
+                        if (handler.errorHandler != null)
+                            handler.errorHandler(m_error);
+                    } catch { }
+                    break;
+                default:
+                    // do nothing
+                    return;
             }
         }
 
+        protected virtual void OnStateChanged() {
+            switch (m_state) {
+                case PromiseState.Resolved:
+                    foreach (var resultHandlerInfo in m_resultHandlers)
+                        try {
+                            if (resultHandlerInfo.resultHandler != null)
+                                resultHandlerInfo.resultHandler(m_result);
+                        } catch (Exception e) {
+                            try {
+                                if (resultHandlerInfo.errorHandler != null)
+                                    resultHandlerInfo.errorHandler(e);
+                            } catch { }
+                        }
+                    break;
+                case PromiseState.Cancelled:
+                    foreach (var cancelHandler in m_cancelHandlers)
+                        cancelHandler();
+                    break;
+                case PromiseState.Rejected:
+                    foreach (var resultHandlerInfo in m_resultHandlers)
+                        try {
+                            if (resultHandlerInfo.errorHandler != null)
+                                resultHandlerInfo.errorHandler(m_error);
+                        } catch { }
+                    break;
+                default:
+                    throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
+            }
+
+            m_resultHandlers = null;
+            m_cancelHandlers = null;
+        }
+
+
+
+        public bool IsExclusive {
+            get {
+                lock (m_lock) {
+                    return m_childrenCount <= 1;
+                }
+            }
+        }
+
+        public PromiseState State {
+            get {
+                lock (m_lock) {
+                    return m_state;
+                }
+            }
+        }
+
+        protected bool Cancel(bool dependencies) {
+            bool result;
+
+            lock (m_lock) {
+                if (m_state == PromiseState.Unresolved) {
+                    m_state = PromiseState.Cancelled;
+                    result = true;
+                } else {
+                    result = false;
+                }
+            }
+
+            if (result)
+                OnStateChanged();
+
+            if (dependencies && m_parent != null && m_parent.IsExclusive) {
+                m_parent.Cancel();
+            }
+
+            return result;
+        }
 
     }
 }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/PromiseState.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab
+{
+    public enum PromiseState
+    {
+        Unresolved,
+        Resolved,
+        Cancelled,
+        Rejected
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/TaskController.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,132 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab
+{
+    /// <summary>
+    /// This class allows to interact with asyncronuos task.
+    /// </summary>
+    /// <remarks>
+    /// Members of this object are thread safe.
+    /// </remarks>
+    class TaskController: IProgressNotifier, ITaskController, ICancellable
+    {
+        readonly object m_lock;
+        string m_message;
+
+        float m_current;
+        float m_max;
+
+        bool m_cancelled;
+
+        public event EventHandler<ValueEventArgs<string>> MessageUpdated;
+        public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
+        public event EventHandler<ProgressInitEventArgs> ProgressInit;
+
+        public TaskController()
+        {
+            m_lock = new Object();
+        }
+
+        public string Message
+        {
+            get
+            {
+                lock (m_lock)
+                    return m_message;
+            }
+            set
+            {
+                lock (m_lock)
+                {
+                    m_message = value;
+                    OnMessageUpdated();
+                }
+            }
+        }
+
+        public float CurrentProgress
+        {
+            get
+            {
+                lock (m_lock)
+                    return m_current;
+            }
+            set
+            {
+                lock (m_lock)
+                {
+                    var prev = m_current;
+                    m_current = value;
+                    if (m_current >= m_max)
+                        m_current = m_max;
+                    if (m_current != prev)
+                        OnProgressUpdated();
+                }
+            }
+        }
+
+        public void InitProgress(float current, float max, string message)
+        {
+            if (max < 0)
+                throw new ArgumentOutOfRangeException("max");
+            if (current < 0 || current > max)
+                throw new ArgumentOutOfRangeException("current");
+
+            lock(m_lock) {
+                m_current = current;
+                m_max = max;
+                m_message = message;
+                OnProgressInit();
+            }
+        }
+
+        public bool Cancelled {
+            get {
+                lock (m_lock)
+                    return m_cancelled;
+            }
+        }
+
+        public bool Cancel() {
+            lock (m_lock) {
+                if (!m_cancelled) {
+                    m_cancelled = true;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+
+        protected virtual void OnMessageUpdated()
+        {
+            var temp = MessageUpdated;
+            if (temp != null)
+            {
+                temp(this, new ValueEventArgs<string>(m_message));
+            }
+        }
+
+        protected virtual void OnProgressUpdated()
+        {
+            var temp = ProgressUpdated;
+            if (temp != null)
+            {
+                temp(this,new ValueEventArgs<float>(m_current));
+            }
+        }
+
+        protected virtual void OnProgressInit()
+        {
+            var temp = ProgressInit;
+            if (temp != null)
+            {
+                temp(this, new ProgressInitEventArgs(m_current,m_max, m_message));
+            }
+        }
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ValueEventArgs.cs	Fri Nov 08 01:27:04 2013 +0400
@@ -0,0 +1,21 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab
+{
+    [Serializable]
+    public class ValueEventArgs<T>: EventArgs
+    {
+        public ValueEventArgs(T value)
+        {
+            this.Value = value;
+        }
+        public T Value
+        {
+            get;
+            private set;
+        }
+    }
+}