# HG changeset patch
# User cin
# Date 1384028493 -14400
# Node ID e3935fdf59a2a7ec7e38de609b39531519cf4ea3
# Parent 7cd4a843b4e4084855ab73d2a036bffaa0705f4f
Promise is rewritten to use interlocked operations instead of locks
diff -r 7cd4a843b4e4 -r e3935fdf59a2 Implab.Test/AsyncTests.cs
--- a/Implab.Test/AsyncTests.cs Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab.Test/AsyncTests.cs Sun Nov 10 00:21:33 2013 +0400
@@ -14,7 +14,7 @@
p.Then(x => res = x);
p.Resolve(100);
- Assert.AreEqual(res, 100);
+ Assert.AreEqual(100, res);
}
[TestMethod]
@@ -244,7 +244,7 @@
[TestMethod]
public void ChainedMapTest() {
- using (var pool = new WorkerPool(8,100,0)) {
+ using (var pool = new WorkerPool(4,4,0)) {
int count = 10000;
double[] args = new double[count];
diff -r 7cd4a843b4e4 -r e3935fdf59a2 Implab.v11.suo
Binary file Implab.v11.suo has changed
diff -r 7cd4a843b4e4 -r e3935fdf59a2 Implab/IPromise.cs
--- a/Implab/IPromise.cs Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/IPromise.cs Sun Nov 10 00:21:33 2013 +0400
@@ -15,19 +15,6 @@
get;
}
- ///
- /// The current state of the promise.
- ///
- PromiseState State
- {
- get;
- }
- ///
- /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
- /// handler will be invoked immediatelly.
- ///
- /// The handler
- void HandleCancelled(Action handler);
}
}
diff -r 7cd4a843b4e4 -r e3935fdf59a2 Implab/Parallels/ArrayTraits.cs
--- a/Implab/Parallels/ArrayTraits.cs Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs Sun Nov 10 00:21:33 2013 +0400
@@ -140,7 +140,7 @@
AsyncPool.InvokeNewThread(() => {
for (int i = 0; i < source.Length; i++) {
- if(promise.State != PromiseState.Unresolved)
+ if(promise.IsResolved)
break; // stop processing in case of error or cancellation
var idx = i;
semaphore.WaitOne();
diff -r 7cd4a843b4e4 -r e3935fdf59a2 Implab/Parallels/MTQueue.cs
--- a/Implab/Parallels/MTQueue.cs Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/Parallels/MTQueue.cs Sun Nov 10 00:21:33 2013 +0400
@@ -42,12 +42,13 @@
next = first.next;
if (next == null) {
// this is the last element,
- // then try to update tail
+ // then try to update the tail
if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
- // this is inconsistent situation which means that the queue is empty
+ // this is a ace condition
if (m_last == null)
+ // the queue is empty
return false;
- // tail has been changed, that means that we need to restart
+ // tail has been changed, than we need to restart
continue;
}
diff -r 7cd4a843b4e4 -r e3935fdf59a2 Implab/Promise.cs
--- a/Implab/Promise.cs Fri Nov 08 01:25:42 2013 +0400
+++ b/Implab/Promise.cs Sun Nov 10 00:21:33 2013 +0400
@@ -3,6 +3,7 @@
using System.Reflection;
using System.Diagnostics;
using System.Threading;
+using Implab.Parallels;
namespace Implab {
@@ -48,24 +49,53 @@
///
public class Promise : IPromise {
- struct ResultHandlerInfo {
+ struct HandlerDescriptor {
public ResultHandler resultHandler;
public ErrorHandler errorHandler;
+ public Action cancellHandler;
+
+ public void Resolve(T result) {
+ if (resultHandler != null)
+ try {
+ resultHandler(result);
+ } catch (Exception e) {
+ Reject(e);
+ }
+ }
+
+ public void Reject(Exception err) {
+ if (errorHandler != null)
+ try {
+ errorHandler(err);
+ } catch {
+ }
+ }
+
+ public void Cancel() {
+ if (cancellHandler != null)
+ try {
+ cancellHandler();
+ } catch {
+ }
+ }
}
+ const int UnresolvedSate = 0;
+ const int TransitionalState = 1;
+ const int ResolvedState = 2;
+ const int RejectedState = 3;
+ const int CancelledState = 4;
+
readonly IPromise m_parent;
-
- LinkedList m_resultHandlers = new LinkedList();
- LinkedList m_cancelHandlers = new LinkedList();
+ readonly bool m_cancellable;
- readonly object m_lock = new Object();
- readonly bool m_cancellable;
int m_childrenCount = 0;
-
- PromiseState m_state;
+ int m_state;
T m_result;
Exception m_error;
+ readonly MTQueue m_handlers = new MTQueue();
+
public Promise() {
m_cancellable = true;
}
@@ -73,8 +103,6 @@
public Promise(IPromise parent, bool cancellable) {
m_cancellable = cancellable;
m_parent = parent;
- if (parent != null)
- parent.HandleCancelled(InternalCancel);
}
void InternalCancel() {
@@ -82,22 +110,39 @@
Cancel(false);
}
+ bool BeginTransit() {
+ return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
+ }
+
+ void CompleteTransit(int state) {
+ if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
+ throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
+ }
+
+ public bool IsResolved {
+ get {
+ return m_state > 1;
+ }
+ }
+
+ public bool IsCancelled {
+ get {
+ return m_state == CancelledState;
+ }
+ }
+
///
/// Выполняет обещание, сообщая об успешном выполнении.
///
/// Результат выполнения.
/// Данное обещание уже выполнено
public void Resolve(T result) {
- lock (m_lock) {
- if (m_state == PromiseState.Cancelled)
- return;
- if (m_state != PromiseState.Unresolved)
- throw new InvalidOperationException("The promise is already resolved");
+ if (BeginTransit()) {
m_result = result;
- m_state = PromiseState.Resolved;
- }
-
- OnStateChanged();
+ CompleteTransit(ResolvedState);
+ OnStateChanged();
+ } else if (m_state != CancelledState)
+ throw new InvalidOperationException("The promise is already resolved");
}
///
@@ -111,16 +156,12 @@
/// Исключение возникшее при выполнении операции
/// Данное обещание уже выполнено
public void Reject(Exception error) {
- lock (m_lock) {
- if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
- return;
- if (m_state != PromiseState.Unresolved)
- throw new InvalidOperationException("The promise is already resolved");
+ if (BeginTransit()) {
m_error = error;
- m_state = PromiseState.Rejected;
- }
-
- OnStateChanged();
+ CompleteTransit(RejectedState);
+ OnStateChanged();
+ } else if (m_state == ResolvedState)
+ throw new InvalidOperationException("The promise is already resolved");
}
///
@@ -144,27 +185,27 @@
var medium = new Promise(this, true);
- var handlerInfo = new ResultHandlerInfo();
-
+ ResultHandler resultHandler;
if (success != null)
- handlerInfo.resultHandler = x => {
+ resultHandler = x => {
success(x);
medium.Resolve(x);
};
else
- handlerInfo.resultHandler = medium.Resolve;
+ resultHandler = medium.Resolve;
+ ErrorHandler errorHandler;
if (error != null)
- handlerInfo.errorHandler = x => {
+ errorHandler = x => {
try {
error(x);
} catch { }
medium.Reject(x);
};
else
- handlerInfo.errorHandler = medium.Reject;
+ errorHandler = medium.Reject;
- AddHandler(handlerInfo);
+ AddHandler(resultHandler, errorHandler, medium.InternalCancel);
return medium;
}
@@ -182,27 +223,28 @@
var medium = new Promise(this, true);
- var handlerInfo = new ResultHandlerInfo();
+ ResultHandler resultHandler;
+ ErrorHandler errorHandler;
if (success != null)
- handlerInfo.resultHandler = x => {
+ resultHandler = x => {
success(x);
medium.Resolve(x);
};
else
- handlerInfo.resultHandler = medium.Resolve;
+ resultHandler = medium.Resolve;
if (error != null)
- handlerInfo.errorHandler = x => {
+ errorHandler = x => {
try {
medium.Resolve(error(x));
} catch { }
medium.Reject(x);
};
else
- handlerInfo.errorHandler = medium.Reject;
+ errorHandler = medium.Reject;
- AddHandler(handlerInfo);
+ AddHandler(resultHandler, errorHandler, medium.InternalCancel);
return medium;
}
@@ -214,19 +256,17 @@
var medium = new Promise(this, true);
- var handlerInfo = new ResultHandlerInfo();
-
+ ResultHandler resultHandler;
+
if (success != null)
- handlerInfo.resultHandler = x => {
+ resultHandler = x => {
success(x);
medium.Resolve(x);
};
else
- handlerInfo.resultHandler = medium.Resolve;
+ resultHandler = medium.Resolve;
- handlerInfo.errorHandler = medium.Reject;
-
- AddHandler(handlerInfo);
+ AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
return medium;
}
@@ -249,15 +289,17 @@
var medium = new Promise(this, true);
- AddHandler(new ResultHandlerInfo {
- errorHandler = e => {
+ AddHandler(
+ null,
+ e => {
try {
medium.Resolve(handler(e));
} catch (Exception e2) {
medium.Reject(e2);
}
- }
- });
+ },
+ medium.InternalCancel
+ );
return medium;
}
@@ -268,8 +310,8 @@
var medium = new Promise();
- AddHandler(new ResultHandlerInfo {
- resultHandler = x => {
+ AddHandler(
+ x => {
// to avoid handler being called multiple times we handle exception by ourselfs
try {
handler();
@@ -278,13 +320,16 @@
medium.Reject(e);
}
},
- errorHandler = x => {
+
+ e => {
try {
handler();
} catch { }
- medium.Reject(x);
- }
- });
+ medium.Reject(e);
+ },
+
+ medium.InternalCancel
+ );
return medium;
}
@@ -304,17 +349,22 @@
// создаем прицепленное обещание
var chained = new Promise();
- AddHandler(new ResultHandlerInfo() {
- resultHandler = result => chained.Resolve(mapper(result)),
- errorHandler = delegate(Exception e) {
- if (error != null)
- try {
- error(e);
- } catch { }
- // в случае ошибки нужно передать исключение дальше по цепочке
- chained.Reject(e);
- }
- });
+ ResultHandler resultHandler = result => chained.Resolve(mapper(result));
+ ErrorHandler errorHandler = delegate(Exception e) {
+ if (error != null)
+ try {
+ error(e);
+ } catch { }
+ // в случае ошибки нужно передать исключение дальше по цепочке
+ chained.Reject(e);
+ };
+
+
+ AddHandler(
+ resultHandler,
+ errorHandler,
+ chained.InternalCancel
+ );
return chained;
}
@@ -341,27 +391,32 @@
// передать через него результаты работы.
var medium = new Promise(this, true);
- AddHandler(new ResultHandlerInfo {
- resultHandler = delegate(T result) {
- if (medium.State == PromiseState.Cancelled)
- return;
+ ResultHandler resultHandler = delegate(T result) {
+ if (medium.IsCancelled)
+ return;
- var promise = chained(result);
+ var promise = chained(result);
- // notify chained operation that it's not needed
- medium.Cancelled(() => promise.Cancel());
- promise.Then(
- x => medium.Resolve(x),
- e => medium.Reject(e)
- );
- },
- errorHandler = delegate(Exception e) {
- if (error != null)
- error(e);
- // в случае ошибки нужно передать исключение дальше по цепочке
- medium.Reject(e);
- }
- });
+ // notify chained operation that it's not needed
+ medium.Cancelled(() => promise.Cancel());
+ promise.Then(
+ x => medium.Resolve(x),
+ e => medium.Reject(e)
+ );
+ };
+
+ ErrorHandler errorHandler = delegate(Exception e) {
+ if (error != null)
+ error(e);
+ // в случае ошибки нужно передать исключение дальше по цепочке
+ medium.Reject(e);
+ };
+
+ AddHandler(
+ resultHandler,
+ errorHandler,
+ medium.InternalCancel
+ );
return medium;
}
@@ -371,19 +426,19 @@
}
public Promise Cancelled(Action handler) {
- if (handler == null)
- return this;
- lock (m_lock) {
- if (m_state == PromiseState.Unresolved)
- m_cancelHandlers.AddLast(handler);
- else if (m_state == PromiseState.Cancelled)
- handler();
- }
+ AddHandler(null, null, handler);
return this;
}
- public void HandleCancelled(Action handler) {
- Cancelled(handler);
+ public Promise Finally(Action handler) {
+ if (handler == null)
+ throw new ArgumentNullException("handler");
+ AddHandler(
+ x => handler(),
+ e => handler(),
+ handler
+ );
+ return this;
}
///
@@ -415,15 +470,15 @@
if (!evt.WaitOne(timeout, true))
throw new TimeoutException();
- switch (State) {
- case PromiseState.Resolved:
+ switch (m_state) {
+ case ResolvedState:
return m_result;
- case PromiseState.Cancelled:
+ case CancelledState:
throw new OperationCanceledException();
- case PromiseState.Rejected:
+ case RejectedState:
throw new TargetInvocationException(m_error);
default:
- throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+ throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
}
}
@@ -431,40 +486,45 @@
return Join(Timeout.Infinite);
}
- void AddHandler(ResultHandlerInfo handler) {
- bool invokeRequired = false;
+ void AddHandler(ResultHandler success, ErrorHandler error, Action cancel) {
+ Interlocked.Increment(ref m_childrenCount);
+
+ HandlerDescriptor handler = new HandlerDescriptor {
+ resultHandler = success,
+ errorHandler = error,
+ cancellHandler = cancel
+ };
- lock (m_lock) {
- m_childrenCount++;
- if (m_state == PromiseState.Unresolved) {
- m_resultHandlers.AddLast(handler);
- } else
- invokeRequired = true;
+ bool queued;
+
+ if (!IsResolved) {
+ m_handlers.Enqueue(handler);
+ queued = true;
+ } else {
+ // the promise is in resolved state, just invoke the handled with minimum overhead
+ queued = false;
+ InvokeHandler(handler);
}
- // обработчики не должны блокировать сам объект
- if (invokeRequired)
+ if (queued && IsResolved && m_handlers.TryDequeue(out handler))
+ // if the promise have been resolved while we was adding handler to the queue
+ // we can't guarantee that someone is still processing it
+ // therefore we will fetch a handler from the queue and execute it
+ // note that fetched handler may be not the one we have added
InvokeHandler(handler);
+
}
- void InvokeHandler(ResultHandlerInfo handler) {
+ void InvokeHandler(HandlerDescriptor handler) {
switch (m_state) {
- case PromiseState.Resolved:
- try {
- if (handler.resultHandler != null)
- handler.resultHandler(m_result);
- } catch (Exception e) {
- try {
- if (handler.errorHandler != null)
- handler.errorHandler(e);
- } catch { }
- }
+ case ResolvedState:
+ handler.Resolve(m_result);
break;
- case PromiseState.Rejected:
- try {
- if (handler.errorHandler != null)
- handler.errorHandler(m_error);
- } catch { }
+ case RejectedState:
+ handler.Reject(m_error);
+ break;
+ case CancelledState:
+ handler.Cancel();
break;
default:
// do nothing
@@ -473,76 +533,31 @@
}
protected virtual void OnStateChanged() {
- switch (m_state) {
- case PromiseState.Resolved:
- foreach (var resultHandlerInfo in m_resultHandlers)
- try {
- if (resultHandlerInfo.resultHandler != null)
- resultHandlerInfo.resultHandler(m_result);
- } catch (Exception e) {
- try {
- if (resultHandlerInfo.errorHandler != null)
- resultHandlerInfo.errorHandler(e);
- } catch { }
- }
- break;
- case PromiseState.Cancelled:
- foreach (var cancelHandler in m_cancelHandlers)
- cancelHandler();
- break;
- case PromiseState.Rejected:
- foreach (var resultHandlerInfo in m_resultHandlers)
- try {
- if (resultHandlerInfo.errorHandler != null)
- resultHandlerInfo.errorHandler(m_error);
- } catch { }
- break;
- default:
- throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
- }
-
- m_resultHandlers = null;
- m_cancelHandlers = null;
+ HandlerDescriptor handler;
+ while (m_handlers.TryDequeue(out handler))
+ InvokeHandler(handler);
}
public bool IsExclusive {
get {
- lock (m_lock) {
- return m_childrenCount <= 1;
- }
- }
- }
-
- public PromiseState State {
- get {
- lock (m_lock) {
- return m_state;
- }
+ return m_childrenCount <= 1;
}
}
protected bool Cancel(bool dependencies) {
- bool result;
-
- lock (m_lock) {
- if (m_state == PromiseState.Unresolved) {
- m_state = PromiseState.Cancelled;
- result = true;
- } else {
- result = false;
- }
- }
-
- if (result)
+ if (BeginTransit()) {
+ CompleteTransit(CancelledState);
OnStateChanged();
- if (dependencies && m_parent != null && m_parent.IsExclusive) {
- m_parent.Cancel();
+ if (dependencies && m_parent != null && m_parent.IsExclusive)
+ m_parent.Cancel();
+
+ return true;
+ } else {
+ return false;
}
-
- return result;
}
}