# HG changeset patch
# User cin
# Date 1384337000 -14400
# Node ID f0568ff069a51973c64a7a6093b48c751a6cb198
# Parent 0c924dff549845e943fccd50adc8239c1930f758# Parent 5a35900264f5d7a3e7a5adf452791da58790804d
Слияние с promises
diff -r 0c924dff5498 -r f0568ff069a5 Implab.Test/AsyncTests.cs
--- a/Implab.Test/AsyncTests.cs Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab.Test/AsyncTests.cs Wed Nov 13 14:03:20 2013 +0400
@@ -14,7 +14,7 @@
p.Then(x => res = x);
p.Resolve(100);
- Assert.AreEqual(res, 100);
+ Assert.AreEqual(100, res);
}
[TestMethod]
@@ -101,18 +101,18 @@
public void WorkerPoolSizeTest() {
var pool = new WorkerPool(5, 10, 0);
- Assert.AreEqual(5, pool.ThreadCount);
+ Assert.AreEqual(5, pool.PoolSize);
- pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
- pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
- pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
+ pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+ pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+ pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
- Assert.AreEqual(5, pool.ThreadCount);
+ Assert.AreEqual(5, pool.PoolSize);
for (int i = 0; i < 100; i++)
- pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
- Thread.Sleep(100);
- Assert.AreEqual(10, pool.ThreadCount);
+ pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
+ Thread.Sleep(200);
+ Assert.AreEqual(10, pool.PoolSize);
pool.Dispose();
}
@@ -149,10 +149,10 @@
[TestMethod]
public void WorkerPoolDisposeTest() {
var pool = new WorkerPool(5, 20);
- Assert.AreEqual(5, pool.ThreadCount);
+ Assert.AreEqual(5, pool.PoolSize);
pool.Dispose();
- Thread.Sleep(100);
- Assert.AreEqual(0, pool.ThreadCount);
+ Thread.Sleep(500);
+ Assert.AreEqual(0, pool.PoolSize);
pool.Dispose();
}
@@ -244,7 +244,7 @@
[TestMethod]
public void ChainedMapTest() {
- using (var pool = new WorkerPool(8,100,0)) {
+ using (var pool = new WorkerPool(4,4,0)) {
int count = 10000;
double[] args = new double[count];
diff -r 0c924dff5498 -r f0568ff069a5 Implab.suo
Binary file Implab.suo has changed
diff -r 0c924dff5498 -r f0568ff069a5 Implab.v11.suo
Binary file Implab.v11.suo has changed
diff -r 0c924dff5498 -r f0568ff069a5 Implab/IPromise.cs
--- a/Implab/IPromise.cs Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/IPromise.cs Wed Nov 13 14:03:20 2013 +0400
@@ -15,19 +15,6 @@
get;
}
- ///
- /// The current state of the promise.
- ///
- PromiseState State
- {
- get;
- }
- ///
- /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
- /// handler will be invoked immediatelly.
- ///
- /// The handler
- void HandleCancelled(Action handler);
}
}
diff -r 0c924dff5498 -r f0568ff069a5 Implab/Parallels/ArrayTraits.cs
--- a/Implab/Parallels/ArrayTraits.cs Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs Wed Nov 13 14:03:20 2013 +0400
@@ -140,7 +140,7 @@
AsyncPool.InvokeNewThread(() => {
for (int i = 0; i < source.Length; i++) {
- if(promise.State != PromiseState.Unresolved)
+ if(promise.IsResolved)
break; // stop processing in case of error or cancellation
var idx = i;
semaphore.WaitOne();
diff -r 0c924dff5498 -r f0568ff069a5 Implab/Parallels/DispatchPool.cs
--- a/Implab/Parallels/DispatchPool.cs Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/DispatchPool.cs Wed Nov 13 14:03:20 2013 +0400
@@ -9,10 +9,15 @@
public abstract class DispatchPool : IDisposable {
readonly int m_minThreads;
readonly int m_maxThreads;
- int m_runningThreads = 0;
- int m_maxRunningThreads = 0;
- int m_suspended = 0;
- int m_exitRequired = 0;
+
+ int m_createdThreads = 0; // the current size of the pool
+ int m_activeThreads = 0; // the count of threads which are active
+ int m_sleepingThreads = 0; // the count of currently inactive threads
+ int m_maxRunningThreads = 0; // the meximum reached size of the pool
+ int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
+ int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
+ int m_wakeEvents = 0; // the count of wake events
+
AutoResetEvent m_hasTasks = new AutoResetEvent(false);
protected DispatchPool(int min, int max) {
@@ -44,9 +49,15 @@
StartWorker();
}
- public int ThreadCount {
+ public int PoolSize {
get {
- return m_runningThreads;
+ return m_createdThreads;
+ }
+ }
+
+ public int ActiveThreads {
+ get {
+ return m_activeThreads;
}
}
@@ -64,29 +75,120 @@
protected abstract bool TryDequeue(out TUnit unit);
- protected virtual bool ExtendPool() {
- if (m_suspended > 0) {
+ #region thread execution traits
+ int SignalThread() {
+ var signals = Interlocked.Increment(ref m_wakeEvents);
+ if(signals == 1)
m_hasTasks.Set();
+ return signals;
+ }
+
+ bool FetchSignalOrWait(int timeout) {
+ var start = Environment.TickCount;
+
+ // означает, что поток владеет блокировкой и при успешном получении сигнала должен
+ // ее вернуть, чтобы другой ожидающий поток смог
+ bool hasLock = false;
+ do {
+ int signals;
+ do {
+ signals = m_wakeEvents;
+ if (signals == 0)
+ break;
+ } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
+
+ if (signals >= 1) {
+ if (signals > 1 && hasLock)
+ m_hasTasks.Set();
+ return true;
+ }
+
+ if (timeout != -1)
+ timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+
+ // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
+ // и уйдет на пустой цикл, после чего заблокируется
+
+ hasLock = true;
+ } while (m_hasTasks.WaitOne(timeout));
+
+ return false;
+ }
+
+ bool Sleep(int timeout) {
+ Interlocked.Increment(ref m_sleepingThreads);
+ if (FetchSignalOrWait(timeout)) {
+ Interlocked.Decrement(ref m_sleepingThreads);
return true;
- } else
- return StartWorker();
+ } else {
+ Interlocked.Decrement(ref m_sleepingThreads);
+ return false;
+ }
}
+ #endregion
///
/// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
///
- protected void WakePool() {
- m_hasTasks.Set(); // wake sleeping thread;
+ protected void GrowPool() {
+ if (m_exitRequired != 0)
+ return;
+ if (m_sleepingThreads > m_wakeEvents) {
+ //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
+
+ // all sleeping threads may gone
+ SignalThread(); // wake a sleeping thread;
- if (AllocateThreadSlot(1)) {
- var worker = new Thread(this.Worker);
- worker.IsBackground = true;
- worker.Start();
+ // we can't check whether signal has been processed
+ // anyway it may take some time for the thread to start
+ // we will ensure that at least one thread is running
+
+ if (AllocateThreadSlot(1)) {
+ // if there were no threads in the pool
+ var worker = new Thread(this.Worker);
+ worker.IsBackground = true;
+ worker.Start();
+ }
+ } else {
+ // if there is no sleeping threads in the pool
+ StartWorker();
}
}
- protected virtual void Suspend() {
- m_hasTasks.WaitOne();
+ private bool Suspend() {
+ //no tasks left, exit if the thread is no longer needed
+ bool last;
+ bool requestExit;
+
+ // if threads have a timeout before releasing
+ if (m_releaseTimeout > 0)
+ requestExit = !Sleep(m_releaseTimeout);
+ else
+ requestExit = true;
+
+ if (!requestExit)
+ return true;
+
+ // release unsused thread
+ if (requestExit && ReleaseThreadSlot(out last)) {
+ // in case at the moment the last thread was being released
+ // a new task was added to the queue, we need to try
+ // to revoke the thread to avoid the situation when the task is left unprocessed
+ if (last && Sleep(0)) { // Sleep(0) will fetch pending task or will return false
+ if (AllocateThreadSlot(1))
+ return true; // spin again...
+ else
+ SignalThread(); // since Sleep(0) has fetched the signal we neet to reschedule it
+
+ }
+
+ return false;
+ }
+
+ // wait till infinity
+ Sleep(-1);
+
+ return true;
}
#region thread slots traits
@@ -95,11 +197,11 @@
int current;
// use spins to allocate slot for the new thread
do {
- current = m_runningThreads;
+ current = m_createdThreads;
if (current >= m_maxThreads || m_exitRequired != 0)
// no more slots left or the pool has been disposed
return false;
- } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
+ } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
UpdateMaxThreads(current + 1);
@@ -107,7 +209,7 @@
}
bool AllocateThreadSlot(int desired) {
- if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
+ if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
return false;
UpdateMaxThreads(desired);
@@ -120,11 +222,11 @@
int current;
// use spins to release slot for the new thread
do {
- current = m_runningThreads;
+ current = m_createdThreads;
if (current <= m_minThreads && m_exitRequired == 0)
// the thread is reserved
return false;
- } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
+ } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
last = (current == 1);
@@ -136,7 +238,7 @@
///
/// true - no more threads left
bool ReleaseThreadSlotAnyway() {
- var left = Interlocked.Decrement(ref m_runningThreads);
+ var left = Interlocked.Decrement(ref m_createdThreads);
return left == 0;
}
@@ -164,54 +266,41 @@
}
}
- bool FetchTask(out TUnit unit) {
+ protected abstract void InvokeUnit(TUnit unit);
+
+ void Worker() {
+ TUnit unit;
+ //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
+ Interlocked.Increment(ref m_activeThreads);
do {
// exit if requested
if (m_exitRequired != 0) {
// release the thread slot
+ Interlocked.Decrement(ref m_activeThreads);
if (ReleaseThreadSlotAnyway()) // it was the last worker
m_hasTasks.Dispose();
else
- m_hasTasks.Set(); // wake next worker
+ SignalThread(); // wake next worker
unit = default(TUnit);
- return false;
+ break;
}
// fetch task
if (TryDequeue(out unit)) {
- ExtendPool();
- return true;
+ InvokeUnit(unit);
+ continue;
}
- //no tasks left, exit if the thread is no longer needed
- bool last;
- if (ReleaseThreadSlot(out last)) {
- if (last && m_hasTasks.WaitOne(0)) {
- if (AllocateThreadSlot(1))
- continue; // spin again...
- else
- // we failed to reallocate slot for this thread
- // therefore we need to release the event
- m_hasTasks.Set();
- }
-
- return false;
- }
+ Interlocked.Decrement(ref m_activeThreads);
// entering suspend state
- Interlocked.Increment(ref m_suspended);
// keep this thread and wait
- Suspend();
- Interlocked.Decrement(ref m_suspended);
+ if (!Suspend())
+ break;
+ //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
+ Interlocked.Increment(ref m_activeThreads);
} while (true);
- }
-
- protected abstract void InvokeUnit(TUnit unit);
-
- void Worker() {
- TUnit unit;
- while (FetchTask(out unit))
- InvokeUnit(unit);
+ //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
}
protected virtual void Dispose(bool disposing) {
@@ -221,7 +310,10 @@
return;
// wake sleeping threads
- m_hasTasks.Set();
+ if (m_createdThreads > 0)
+ SignalThread();
+ else
+ m_hasTasks.Dispose();
GC.SuppressFinalize(this);
}
}
diff -r 0c924dff5498 -r f0568ff069a5 Implab/Parallels/MTQueue.cs
--- a/Implab/Parallels/MTQueue.cs Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/MTQueue.cs Wed Nov 13 14:03:20 2013 +0400
@@ -42,12 +42,13 @@
next = first.next;
if (next == null) {
// this is the last element,
- // then try to update tail
+ // then try to update the tail
if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
- // this is inconsistent situation which means that the queue is empty
+ // this is a ace condition
if (m_last == null)
+ // the queue is empty
return false;
- // tail has been changed, that means that we need to restart
+ // tail has been changed, than we need to restart
continue;
}
diff -r 0c924dff5498 -r f0568ff069a5 Implab/Parallels/WorkerPool.cs
--- a/Implab/Parallels/WorkerPool.cs Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Parallels/WorkerPool.cs Wed Nov 13 14:03:20 2013 +0400
@@ -57,16 +57,8 @@
var len = Interlocked.Increment(ref m_queueLength);
m_queue.Enqueue(unit);
- if(!ExtendPool())
- WakePool();
- }
-
- protected override bool ExtendPool() {
- if (m_queueLength <= m_threshold*ThreadCount)
- // in this case we are in active thread and it request for additional workers
- // satisfy it only when queue is longer than threshold
- return false;
- return base.ExtendPool();
+ if (len > m_threshold*ActiveThreads)
+ GrowPool();
}
protected override bool TryDequeue(out Action unit) {
@@ -81,9 +73,5 @@
unit();
}
- protected override void Suspend() {
- if (m_queueLength == 0)
- base.Suspend();
- }
}
}
diff -r 0c924dff5498 -r f0568ff069a5 Implab/Promise.cs
--- a/Implab/Promise.cs Fri Nov 08 01:27:04 2013 +0400
+++ b/Implab/Promise.cs Wed Nov 13 14:03:20 2013 +0400
@@ -3,6 +3,7 @@
using System.Reflection;
using System.Diagnostics;
using System.Threading;
+using Implab.Parallels;
namespace Implab {
@@ -48,24 +49,53 @@
///
public class Promise : IPromise {
- struct ResultHandlerInfo {
+ struct HandlerDescriptor {
public ResultHandler resultHandler;
public ErrorHandler errorHandler;
+ public Action cancellHandler;
+
+ public void Resolve(T result) {
+ if (resultHandler != null)
+ try {
+ resultHandler(result);
+ } catch (Exception e) {
+ Reject(e);
+ }
+ }
+
+ public void Reject(Exception err) {
+ if (errorHandler != null)
+ try {
+ errorHandler(err);
+ } catch {
+ }
+ }
+
+ public void Cancel() {
+ if (cancellHandler != null)
+ try {
+ cancellHandler();
+ } catch {
+ }
+ }
}
+ const int UnresolvedSate = 0;
+ const int TransitionalState = 1;
+ const int ResolvedState = 2;
+ const int RejectedState = 3;
+ const int CancelledState = 4;
+
readonly IPromise m_parent;
-
- LinkedList m_resultHandlers = new LinkedList();
- LinkedList m_cancelHandlers = new LinkedList();
+ readonly bool m_cancellable;
- readonly object m_lock = new Object();
- readonly bool m_cancellable;
int m_childrenCount = 0;
-
- PromiseState m_state;
+ int m_state;
T m_result;
Exception m_error;
+ readonly MTQueue m_handlers = new MTQueue();
+
public Promise() {
m_cancellable = true;
}
@@ -73,8 +103,6 @@
public Promise(IPromise parent, bool cancellable) {
m_cancellable = cancellable;
m_parent = parent;
- if (parent != null)
- parent.HandleCancelled(InternalCancel);
}
void InternalCancel() {
@@ -82,22 +110,39 @@
Cancel(false);
}
+ bool BeginTransit() {
+ return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
+ }
+
+ void CompleteTransit(int state) {
+ if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
+ throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
+ }
+
+ public bool IsResolved {
+ get {
+ return m_state > 1;
+ }
+ }
+
+ public bool IsCancelled {
+ get {
+ return m_state == CancelledState;
+ }
+ }
+
///
/// Выполняет обещание, сообщая об успешном выполнении.
///
/// Результат выполнения.
/// Данное обещание уже выполнено
public void Resolve(T result) {
- lock (m_lock) {
- if (m_state == PromiseState.Cancelled)
- return;
- if (m_state != PromiseState.Unresolved)
- throw new InvalidOperationException("The promise is already resolved");
+ if (BeginTransit()) {
m_result = result;
- m_state = PromiseState.Resolved;
- }
-
- OnStateChanged();
+ CompleteTransit(ResolvedState);
+ OnStateChanged();
+ } else if (m_state != CancelledState)
+ throw new InvalidOperationException("The promise is already resolved");
}
///
@@ -111,16 +156,12 @@
/// Исключение возникшее при выполнении операции
/// Данное обещание уже выполнено
public void Reject(Exception error) {
- lock (m_lock) {
- if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
- return;
- if (m_state != PromiseState.Unresolved)
- throw new InvalidOperationException("The promise is already resolved");
+ if (BeginTransit()) {
m_error = error;
- m_state = PromiseState.Rejected;
- }
-
- OnStateChanged();
+ CompleteTransit(RejectedState);
+ OnStateChanged();
+ } else if (m_state == ResolvedState)
+ throw new InvalidOperationException("The promise is already resolved");
}
///
@@ -144,27 +185,27 @@
var medium = new Promise(this, true);
- var handlerInfo = new ResultHandlerInfo();
-
+ ResultHandler resultHandler;
if (success != null)
- handlerInfo.resultHandler = x => {
+ resultHandler = x => {
success(x);
medium.Resolve(x);
};
else
- handlerInfo.resultHandler = medium.Resolve;
+ resultHandler = medium.Resolve;
+ ErrorHandler errorHandler;
if (error != null)
- handlerInfo.errorHandler = x => {
+ errorHandler = x => {
try {
error(x);
} catch { }
medium.Reject(x);
};
else
- handlerInfo.errorHandler = medium.Reject;
+ errorHandler = medium.Reject;
- AddHandler(handlerInfo);
+ AddHandler(resultHandler, errorHandler, medium.InternalCancel);
return medium;
}
@@ -182,27 +223,28 @@
var medium = new Promise(this, true);
- var handlerInfo = new ResultHandlerInfo();
+ ResultHandler resultHandler;
+ ErrorHandler errorHandler;
if (success != null)
- handlerInfo.resultHandler = x => {
+ resultHandler = x => {
success(x);
medium.Resolve(x);
};
else
- handlerInfo.resultHandler = medium.Resolve;
+ resultHandler = medium.Resolve;
if (error != null)
- handlerInfo.errorHandler = x => {
+ errorHandler = x => {
try {
medium.Resolve(error(x));
} catch { }
medium.Reject(x);
};
else
- handlerInfo.errorHandler = medium.Reject;
+ errorHandler = medium.Reject;
- AddHandler(handlerInfo);
+ AddHandler(resultHandler, errorHandler, medium.InternalCancel);
return medium;
}
@@ -214,19 +256,17 @@
var medium = new Promise(this, true);
- var handlerInfo = new ResultHandlerInfo();
-
+ ResultHandler resultHandler;
+
if (success != null)
- handlerInfo.resultHandler = x => {
+ resultHandler = x => {
success(x);
medium.Resolve(x);
};
else
- handlerInfo.resultHandler = medium.Resolve;
+ resultHandler = medium.Resolve;
- handlerInfo.errorHandler = medium.Reject;
-
- AddHandler(handlerInfo);
+ AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
return medium;
}
@@ -249,15 +289,17 @@
var medium = new Promise(this, true);
- AddHandler(new ResultHandlerInfo {
- errorHandler = e => {
+ AddHandler(
+ null,
+ e => {
try {
medium.Resolve(handler(e));
} catch (Exception e2) {
medium.Reject(e2);
}
- }
- });
+ },
+ medium.InternalCancel
+ );
return medium;
}
@@ -268,8 +310,8 @@
var medium = new Promise();
- AddHandler(new ResultHandlerInfo {
- resultHandler = x => {
+ AddHandler(
+ x => {
// to avoid handler being called multiple times we handle exception by ourselfs
try {
handler();
@@ -278,13 +320,16 @@
medium.Reject(e);
}
},
- errorHandler = x => {
+
+ e => {
try {
handler();
} catch { }
- medium.Reject(x);
- }
- });
+ medium.Reject(e);
+ },
+
+ medium.InternalCancel
+ );
return medium;
}
@@ -304,17 +349,22 @@
// создаем прицепленное обещание
var chained = new Promise();
- AddHandler(new ResultHandlerInfo() {
- resultHandler = result => chained.Resolve(mapper(result)),
- errorHandler = delegate(Exception e) {
- if (error != null)
- try {
- error(e);
- } catch { }
- // в случае ошибки нужно передать исключение дальше по цепочке
- chained.Reject(e);
- }
- });
+ ResultHandler resultHandler = result => chained.Resolve(mapper(result));
+ ErrorHandler errorHandler = delegate(Exception e) {
+ if (error != null)
+ try {
+ error(e);
+ } catch { }
+ // в случае ошибки нужно передать исключение дальше по цепочке
+ chained.Reject(e);
+ };
+
+
+ AddHandler(
+ resultHandler,
+ errorHandler,
+ chained.InternalCancel
+ );
return chained;
}
@@ -341,27 +391,32 @@
// передать через него результаты работы.
var medium = new Promise(this, true);
- AddHandler(new ResultHandlerInfo {
- resultHandler = delegate(T result) {
- if (medium.State == PromiseState.Cancelled)
- return;
+ ResultHandler resultHandler = delegate(T result) {
+ if (medium.IsCancelled)
+ return;
- var promise = chained(result);
+ var promise = chained(result);
- // notify chained operation that it's not needed
- medium.Cancelled(() => promise.Cancel());
- promise.Then(
- x => medium.Resolve(x),
- e => medium.Reject(e)
- );
- },
- errorHandler = delegate(Exception e) {
- if (error != null)
- error(e);
- // в случае ошибки нужно передать исключение дальше по цепочке
- medium.Reject(e);
- }
- });
+ // notify chained operation that it's not needed
+ medium.Cancelled(() => promise.Cancel());
+ promise.Then(
+ x => medium.Resolve(x),
+ e => medium.Reject(e)
+ );
+ };
+
+ ErrorHandler errorHandler = delegate(Exception e) {
+ if (error != null)
+ error(e);
+ // в случае ошибки нужно передать исключение дальше по цепочке
+ medium.Reject(e);
+ };
+
+ AddHandler(
+ resultHandler,
+ errorHandler,
+ medium.InternalCancel
+ );
return medium;
}
@@ -371,19 +426,19 @@
}
public Promise Cancelled(Action handler) {
- if (handler == null)
- return this;
- lock (m_lock) {
- if (m_state == PromiseState.Unresolved)
- m_cancelHandlers.AddLast(handler);
- else if (m_state == PromiseState.Cancelled)
- handler();
- }
+ AddHandler(null, null, handler);
return this;
}
- public void HandleCancelled(Action handler) {
- Cancelled(handler);
+ public Promise Finally(Action handler) {
+ if (handler == null)
+ throw new ArgumentNullException("handler");
+ AddHandler(
+ x => handler(),
+ e => handler(),
+ handler
+ );
+ return this;
}
///
@@ -415,15 +470,15 @@
if (!evt.WaitOne(timeout, true))
throw new TimeoutException();
- switch (State) {
- case PromiseState.Resolved:
+ switch (m_state) {
+ case ResolvedState:
return m_result;
- case PromiseState.Cancelled:
+ case CancelledState:
throw new OperationCanceledException();
- case PromiseState.Rejected:
+ case RejectedState:
throw new TargetInvocationException(m_error);
default:
- throw new ApplicationException(String.Format("Invalid promise state {0}", State));
+ throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
}
}
@@ -431,40 +486,45 @@
return Join(Timeout.Infinite);
}
- void AddHandler(ResultHandlerInfo handler) {
- bool invokeRequired = false;
+ void AddHandler(ResultHandler success, ErrorHandler error, Action cancel) {
+ Interlocked.Increment(ref m_childrenCount);
+
+ HandlerDescriptor handler = new HandlerDescriptor {
+ resultHandler = success,
+ errorHandler = error,
+ cancellHandler = cancel
+ };
- lock (m_lock) {
- m_childrenCount++;
- if (m_state == PromiseState.Unresolved) {
- m_resultHandlers.AddLast(handler);
- } else
- invokeRequired = true;
+ bool queued;
+
+ if (!IsResolved) {
+ m_handlers.Enqueue(handler);
+ queued = true;
+ } else {
+ // the promise is in resolved state, just invoke the handled with minimum overhead
+ queued = false;
+ InvokeHandler(handler);
}
- // обработчики не должны блокировать сам объект
- if (invokeRequired)
+ if (queued && IsResolved && m_handlers.TryDequeue(out handler))
+ // if the promise have been resolved while we was adding handler to the queue
+ // we can't guarantee that someone is still processing it
+ // therefore we will fetch a handler from the queue and execute it
+ // note that fetched handler may be not the one we have added
InvokeHandler(handler);
+
}
- void InvokeHandler(ResultHandlerInfo handler) {
+ void InvokeHandler(HandlerDescriptor handler) {
switch (m_state) {
- case PromiseState.Resolved:
- try {
- if (handler.resultHandler != null)
- handler.resultHandler(m_result);
- } catch (Exception e) {
- try {
- if (handler.errorHandler != null)
- handler.errorHandler(e);
- } catch { }
- }
+ case ResolvedState:
+ handler.Resolve(m_result);
break;
- case PromiseState.Rejected:
- try {
- if (handler.errorHandler != null)
- handler.errorHandler(m_error);
- } catch { }
+ case RejectedState:
+ handler.Reject(m_error);
+ break;
+ case CancelledState:
+ handler.Cancel();
break;
default:
// do nothing
@@ -473,76 +533,31 @@
}
protected virtual void OnStateChanged() {
- switch (m_state) {
- case PromiseState.Resolved:
- foreach (var resultHandlerInfo in m_resultHandlers)
- try {
- if (resultHandlerInfo.resultHandler != null)
- resultHandlerInfo.resultHandler(m_result);
- } catch (Exception e) {
- try {
- if (resultHandlerInfo.errorHandler != null)
- resultHandlerInfo.errorHandler(e);
- } catch { }
- }
- break;
- case PromiseState.Cancelled:
- foreach (var cancelHandler in m_cancelHandlers)
- cancelHandler();
- break;
- case PromiseState.Rejected:
- foreach (var resultHandlerInfo in m_resultHandlers)
- try {
- if (resultHandlerInfo.errorHandler != null)
- resultHandlerInfo.errorHandler(m_error);
- } catch { }
- break;
- default:
- throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
- }
-
- m_resultHandlers = null;
- m_cancelHandlers = null;
+ HandlerDescriptor handler;
+ while (m_handlers.TryDequeue(out handler))
+ InvokeHandler(handler);
}
public bool IsExclusive {
get {
- lock (m_lock) {
- return m_childrenCount <= 1;
- }
- }
- }
-
- public PromiseState State {
- get {
- lock (m_lock) {
- return m_state;
- }
+ return m_childrenCount <= 1;
}
}
protected bool Cancel(bool dependencies) {
- bool result;
-
- lock (m_lock) {
- if (m_state == PromiseState.Unresolved) {
- m_state = PromiseState.Cancelled;
- result = true;
- } else {
- result = false;
- }
- }
-
- if (result)
+ if (BeginTransit()) {
+ CompleteTransit(CancelledState);
OnStateChanged();
- if (dependencies && m_parent != null && m_parent.IsExclusive) {
- m_parent.Cancel();
+ if (dependencies && m_parent != null && m_parent.IsExclusive)
+ m_parent.Cancel();
+
+ return true;
+ } else {
+ return false;
}
-
- return result;
}
}