# HG changeset patch
# User cin
# Date 1383487658 -14400
# Node ID aa33d0bb8c0cbb2aca37c657e35aedbfdc896b23
# Parent c82e0dfbb4ddf9ae375cf213d9370b9d5835f268
implemeted new cancellable promises concept
diff -r c82e0dfbb4dd -r aa33d0bb8c0c .hgignore
--- a/.hgignore Sat Nov 02 00:55:47 2013 +0400
+++ b/.hgignore Sun Nov 03 18:07:38 2013 +0400
@@ -10,3 +10,4 @@
Implab.Fx/bin/
Implab.Fx.Test/bin/
Implab.Fx.Test/obj/
+_ReSharper.Implab/
diff -r c82e0dfbb4dd -r aa33d0bb8c0c Implab.Test/AsyncTests.cs
--- a/Implab.Test/AsyncTests.cs Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab.Test/AsyncTests.cs Sun Nov 03 18:07:38 2013 +0400
@@ -1,10 +1,9 @@
using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
-using Implab;
using System.Reflection;
using System.Threading;
-namespace Implab.Tests
+namespace Implab.Test
{
[TestClass]
public class AsyncTests
@@ -90,12 +89,39 @@
public void PoolTest ()
{
var pid = Thread.CurrentThread.ManagedThreadId;
- var p = AsyncPool.Invoke (() => {
- return Thread.CurrentThread.ManagedThreadId;
- });
+ var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId);
Assert.AreNotEqual (pid, p.Join ());
}
+
+ [TestMethod]
+ public void ComplexCase1Test() {
+ var flags = new bool[3];
+
+ // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
+
+ var p = PromiseHelper
+ .Sleep(200, "Alan")
+ .Cancelled(() => flags[0] = true)
+ .Chain(x =>
+ PromiseHelper
+ .Sleep(200, "Hi, " + x)
+ .Map( y => y )
+ .Cancelled(() => flags[1] = true)
+ )
+ .Cancelled(() => flags[2] = true);
+ Thread.Sleep(300);
+ p.Cancel();
+ try {
+ Assert.AreEqual(p.Join(), "Hi, Alan");
+ Assert.Fail("Shouldn't get here");
+ } catch(OperationCanceledException) {
+ }
+
+ Assert.IsFalse(flags[0]);
+ Assert.IsTrue(flags[1]);
+ Assert.IsTrue(flags[2]);
+ }
}
}
diff -r c82e0dfbb4dd -r aa33d0bb8c0c Implab.Test/Implab.Test.csproj
--- a/Implab.Test/Implab.Test.csproj Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab.Test/Implab.Test.csproj Sun Nov 03 18:07:38 2013 +0400
@@ -46,6 +46,7 @@
+
diff -r c82e0dfbb4dd -r aa33d0bb8c0c Implab.Test/PromiseHelper.cs
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab.Test/PromiseHelper.cs Sun Nov 03 18:07:38 2013 +0400
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+
+namespace Implab.Test {
+ class PromiseHelper {
+ public static Promise Sleep(int timeout, T retVal) {
+ return AsyncPool.Invoke(() => {
+ Thread.Sleep(timeout);
+ return retVal;
+ });
+ }
+ }
+}
diff -r c82e0dfbb4dd -r aa33d0bb8c0c Implab.suo
Binary file Implab.suo has changed
diff -r c82e0dfbb4dd -r aa33d0bb8c0c Implab/IPromise.cs
--- a/Implab/IPromise.cs Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab/IPromise.cs Sun Nov 03 18:07:38 2013 +0400
@@ -29,5 +29,12 @@
/// Try to cancel the whole promise chain, the parent promise will be cancelled only if it has only one promise
///
bool Cancel(bool dependencies);
+
+ ///
+ /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
+ /// handler will be invoked immediatelly.
+ ///
+ /// The handler
+ void HandleCancelled(Action handler);
}
}
diff -r c82e0dfbb4dd -r aa33d0bb8c0c Implab/Promise.cs
--- a/Implab/Promise.cs Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab/Promise.cs Sun Nov 03 18:07:38 2013 +0400
@@ -1,8 +1,6 @@
using System;
using System.Collections.Generic;
-using System.Linq;
using System.Reflection;
-using System.Text;
using System.Diagnostics;
using System.Threading;
@@ -10,9 +8,9 @@
public delegate void ErrorHandler(Exception e);
- public delegate void ResultHandler(T result);
- public delegate TNew ResultMapper(TSrc result);
- public delegate Promise ChainedOperation(TSrc result);
+ public delegate void ResultHandler(T result);
+ public delegate TNew ResultMapper(TSrc result);
+ public delegate Promise ChainedOperation(TSrc result);
///
/// Класс для асинхронного получения результатов. Так называемое "обещание".
@@ -55,20 +53,19 @@
public ErrorHandler errorHandler;
}
- IPromise m_parent;
+ readonly IPromise m_parent;
LinkedList m_resultHandlers = new LinkedList();
LinkedList m_cancelHandlers = new LinkedList();
- object m_lock = new Object();
- bool m_cancellable;
+ readonly object m_lock = new Object();
+ readonly bool m_cancellable;
+ int m_childrenCount = 0;
PromiseState m_state;
T m_result;
Exception m_error;
- int m_childrenCount;
-
public Promise() {
m_cancellable = true;
}
@@ -76,15 +73,14 @@
public Promise(IPromise parent, bool cancellable) {
m_cancellable = cancellable;
m_parent = parent;
+ if (parent != null)
+ parent.HandleCancelled(InternalCancel);
}
- ///
- /// Событие, возникающее при отмене асинхронной операции.
- ///
- ///
- /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить.
- ///
- public event EventHandler Cancelled;
+ void InternalCancel() {
+ // don't try to cancel parent :)
+ Cancel(false);
+ }
///
/// Выполняет обещание, сообщая об успешном выполнении.
@@ -101,14 +97,7 @@
m_state = PromiseState.Resolved;
}
- // state has been changed to rejected new handlers can't be added
-
- foreach (var handler in m_resultHandlers)
- InvokeHandler(handler);
-
- /* ResultHandlerInfo handler;
- while (FetchNextHandler(out handler))
- InvokeHandler(handler); */
+ OnStateChanged();
}
///
@@ -126,14 +115,7 @@
m_state = PromiseState.Rejected;
}
- // state has been changed to rejected new handlers can't be added
-
- foreach (var handler in m_resultHandlers)
- InvokeHandler(handler);
-
- /*ResultHandlerInfo handler;
- while (FetchNextHandler(out handler))
- InvokeHandler(handler);*/
+ OnStateChanged();
}
///
@@ -144,6 +126,39 @@
return Cancel(true);
}
+ protected virtual void OnStateChanged() {
+ switch (m_state) {
+ case PromiseState.Resolved:
+ foreach (var resultHandlerInfo in m_resultHandlers)
+ try {
+ if (resultHandlerInfo.resultHandler != null)
+ resultHandlerInfo.resultHandler(m_result);
+ } catch (Exception e) {
+ try {
+ if (resultHandlerInfo.errorHandler != null)
+ resultHandlerInfo.errorHandler(e);
+ } catch { }
+ }
+ break;
+ case PromiseState.Cancelled:
+ foreach (var cancelHandler in m_cancelHandlers)
+ cancelHandler();
+ break;
+ case PromiseState.Rejected:
+ foreach (var resultHandlerInfo in m_resultHandlers)
+ try {
+ if (resultHandlerInfo.errorHandler != null)
+ resultHandlerInfo.errorHandler(m_error);
+ } catch { }
+ break;
+ default:
+ throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
+ }
+
+ m_resultHandlers = null;
+ m_cancelHandlers = null;
+ }
+
///
/// Добавляет обработчики событий выполнения обещания.
///
@@ -162,15 +177,11 @@
if (success != null)
handlerInfo.resultHandler = x => {
- try {
- success(x);
- medium.Resolve(x);
- } catch (Exception e) {
- medium.Reject(e);
- }
+ success(x);
+ medium.Resolve(x);
};
else
- handlerInfo.resultHandler = x => medium.Resolve(x);
+ handlerInfo.resultHandler = medium.Resolve;
if (error != null)
handlerInfo.errorHandler = x => {
@@ -180,7 +191,7 @@
medium.Reject(x);
};
else
- handlerInfo.errorHandler = x => medium.Reject(x);
+ handlerInfo.errorHandler = medium.Reject;
AddHandler(handlerInfo);
@@ -203,6 +214,7 @@
AddHandler(new ResultHandlerInfo {
resultHandler = x => {
+ // to avoid handler being called multiple times we handle exception by ourselfs
try {
handler();
medium.Resolve(x);
@@ -234,20 +246,15 @@
throw new ArgumentNullException("mapper");
// создаем прицепленное обещание
- Promise chained = new Promise();
+ var chained = new Promise();
AddHandler(new ResultHandlerInfo() {
- resultHandler = delegate(T result) {
- try {
- // если преобразование выдаст исключение, то сработает reject сцепленного deferred
- chained.Resolve(mapper(result));
- } catch (Exception e) {
- chained.Reject(e);
- }
- },
+ resultHandler = result => chained.Resolve(mapper(result)),
errorHandler = delegate(Exception e) {
if (error != null)
- error(e);
+ try {
+ error(e);
+ } catch { }
// в случае ошибки нужно передать исключение дальше по цепочке
chained.Reject(e);
}
@@ -276,19 +283,21 @@
// создать посредника, к которому будут подвызяваться следующие обработчики.
// когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
// передать через него результаты работы.
- Promise medium = new Promise();
+ var medium = new Promise(this, true);
- AddHandler(new ResultHandlerInfo() {
+ AddHandler(new ResultHandlerInfo {
resultHandler = delegate(T result) {
- try {
- chained(result).Then(
- x => medium.Resolve(x),
- e => medium.Reject(e)
- );
- } catch (Exception e) {
- // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке
- medium.Reject(e);
- }
+ if (medium.State == PromiseState.Cancelled)
+ return;
+
+ var promise = chained(result);
+
+ // notify chained operation that it's not needed
+ medium.Cancelled(() => promise.Cancel());
+ promise.Then(
+ medium.Resolve,
+ medium.Reject
+ );
},
errorHandler = delegate(Exception e) {
if (error != null)
@@ -305,6 +314,22 @@
return Chain(chained, null);
}
+ public Promise Cancelled(Action handler) {
+ if (handler == null)
+ return this;
+ lock (m_lock) {
+ if (m_state == PromiseState.Unresolved)
+ m_cancelHandlers.AddLast(handler);
+ else if (m_state == PromiseState.Cancelled)
+ handler();
+ }
+ return this;
+ }
+
+ public void HandleCancelled(Action handler) {
+ Cancelled(handler);
+ }
+
///
/// Дожидается отложенного обещания и в случае успеха, возвращает
/// его, результат, в противном случае бросает исключение.
@@ -327,51 +352,37 @@
/// Время ожидания
/// Результат выполнения обещания
public T Join(int timeout) {
- ManualResetEvent evt = new ManualResetEvent(false);
+ var evt = new ManualResetEvent(false);
Anyway(() => evt.Set());
+ Cancelled(() => evt.Set());
if (!evt.WaitOne(timeout, true))
throw new TimeoutException();
- if (m_error != null)
- throw new TargetInvocationException(m_error);
- else
- return m_result;
+ switch (State) {
+ case PromiseState.Resolved:
+ return m_result;
+ case PromiseState.Cancelled:
+ throw new OperationCanceledException();
+ case PromiseState.Rejected:
+ throw new TargetInvocationException(m_error);
+ default:
+ throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+ }
}
public T Join() {
return Join(Timeout.Infinite);
}
- ///
- /// Данный метод последовательно извлекает обработчики обещания и когда
- /// их больше не осталось - ставит состояние "разрешено".
- ///
- /// Информация об обработчике
- /// Признак того, что еще остались обработчики в очереди
- bool FetchNextHandler(out ResultHandlerInfo handler) {
- handler = default(ResultHandlerInfo);
-
- lock (this) {
- Debug.Assert(m_state != PromiseState.Unresolved);
-
- if (m_resultHandlers.Count > 0) {
- handler = m_resultHandlers.First.Value;
- m_resultHandlers.RemoveFirst();
- return true;
- } else {
- return false;
- }
- }
- }
-
void AddHandler(ResultHandlerInfo handler) {
bool invokeRequired = false;
- lock (this) {
- if (m_state == PromiseState.Unresolved)
+ lock (m_lock) {
+ m_childrenCount++;
+ if (m_state == PromiseState.Unresolved) {
m_resultHandlers.AddLast(handler);
- else
+ } else
invokeRequired = true;
}
@@ -381,18 +392,27 @@
}
void InvokeHandler(ResultHandlerInfo handler) {
- if (m_error == null) {
- try {
- if (handler.resultHandler != null)
- handler.resultHandler(m_result);
- } catch { }
- }
-
- if (m_error != null) {
- try {
- if (handler.errorHandler != null)
- handler.errorHandler(m_error);
- } catch { }
+ switch (m_state) {
+ case PromiseState.Resolved:
+ try {
+ if (handler.resultHandler != null)
+ handler.resultHandler(m_result);
+ } catch (Exception e) {
+ try {
+ if (handler.errorHandler != null)
+ handler.errorHandler(e);
+ } catch { }
+ }
+ break;
+ case PromiseState.Rejected:
+ try {
+ if (handler.errorHandler != null)
+ handler.errorHandler(m_error);
+ } catch { }
+ break;
+ default:
+ // do nothing
+ return;
}
}
@@ -426,15 +446,11 @@
}
}
- if (dependencies && m_parent != null && m_parent.IsExclusive) {
- // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive)
- m_parent.Cancel(true);
- }
+ if (result)
+ OnStateChanged();
- if (result) {
- // state has been changed to cancelled, new handlers can't be added
- foreach (var handler in m_cancelHandlers)
- handler();
+ if (dependencies && m_parent != null && m_parent.IsExclusive) {
+ m_parent.Cancel(true);
}
return result;
diff -r c82e0dfbb4dd -r aa33d0bb8c0c Implab/TaskController.cs
--- a/Implab/TaskController.cs Sat Nov 02 00:55:47 2013 +0400
+++ b/Implab/TaskController.cs Sun Nov 03 18:07:38 2013 +0400
@@ -14,9 +14,8 @@
///
class TaskController
{
- object m_lock;
+ readonly object m_lock;
string m_message;
- bool m_cancelled;
float m_current;
float m_max;