Mercurial > pub > ImplabNet
changeset 242:cbe10ac0731e v3
Working on promises
author | cin |
---|---|
date | Wed, 24 Jan 2018 03:03:21 +0300 |
parents | fa6cbf4d8841 |
children | b1e0ffdf3451 |
files | Implab.Fx/StaApartment.cs Implab.Test/CancelationTests.cs Implab/AbstractEvent.cs Implab/AbstractPromise.cs Implab/AbstractPromiseT.cs Implab/FailedPromise.cs Implab/FailedPromiseT.cs Implab/IPromise.cs Implab/IPromiseT.cs Implab/Parallels/ArrayTraits.cs Implab/Parallels/SimpleAsyncQueue.cs Implab/PromiseAwaiter.cs Implab/PromiseAwaiterT.cs Implab/SuccessPromise.cs Implab/SuccessPromiseT.cs packages/repositories.config |
diffstat | 16 files changed, 131 insertions(+), 328 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Fx/StaApartment.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab.Fx/StaApartment.cs Wed Jan 24 03:03:21 2018 +0300 @@ -195,7 +195,7 @@ protected override void Dispose(bool disposing) { if (disposing) { - if (!m_threadTerminated.IsResolved) + if (!m_threadTerminated.IsFulfilled) m_syncContext.Post(x => Application.ExitThread(), null); } base.Dispose(disposing);
--- a/Implab.Test/CancelationTests.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab.Test/CancelationTests.cs Wed Jan 24 03:03:21 2018 +0300 @@ -40,7 +40,7 @@ // cancel the promise Assert.IsTrue(p.CancelOperationIfRequested()); Assert.IsTrue(p.IsCancelled); - Assert.AreSame(reason, p.Error); + Assert.AreSame(reason, p.RejectReason); } [TestMethod] @@ -76,7 +76,7 @@ task.Cancel(); Assert.IsTrue(task.IsCancellationRequested); Assert.IsFalse(task.IsCancelled); - Assert.IsFalse(task.IsResolved); + Assert.IsFalse(task.IsFulfilled); finish.Set(); task.Join(1000);
--- a/Implab/AbstractEvent.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/AbstractEvent.cs Wed Jan 24 03:03:21 2018 +0300 @@ -2,56 +2,50 @@ using Implab.Parallels; using System.Threading; using System.Reflection; +using System.Diagnostics; namespace Implab { - public abstract class AbstractEvent<THandler> : ICancellable { + public abstract class AbstractEvent<THandler> where THandler : class { - const int UNRESOLVED_SATE = 0; - const int TRANSITIONAL_STATE = 1; + const int PENDING_SATE = 0; + protected const int TRANSITIONAL_STATE = 1; + protected const int SUCCEEDED_STATE = 2; protected const int REJECTED_STATE = 3; - protected const int CANCELLED_STATE = 4; - const int CANCEL_NOT_REQUESTED = 0; - const int CANCEL_REQUESTING = 1; - const int CANCEL_REQUESTED = 2; - - const int RESERVED_HANDLERS_COUNT = 4; - - int m_state; + volatile int m_state; Exception m_error; - int m_handlersCount; - //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - THandler[] m_handlers; + THandler m_handler; SimpleAsyncQueue<THandler> m_extraHandlers; - int m_handlerPointer = -1; - int m_handlersCommited; - - int m_cancelRequest; - Exception m_cancelationReason; #region state managment - bool BeginTransit() { - return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + protected bool BeginTransit() { + return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); } - void CompleteTransit(int state) { + protected void CompleteTransit(int state) { +#if DEBUG if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); +#else + m_state = state; +#endif + Signal(); } - void WaitTransition() { - while (m_state == TRANSITIONAL_STATE) { - Thread.MemoryBarrier(); + protected void WaitTransition() { + if (m_state == TRANSITIONAL_STATE) { + SpinWait spin; + do { + spin.SpinOnce(); + } while (m_state == TRANSITIONAL_STATE); } } protected bool BeginSetResult() { if (!BeginTransit()) { WaitTransition(); - if (m_state != CANCELLED_STATE) - throw new InvalidOperationException("The promise is already resolved"); return false; } return true; @@ -59,7 +53,6 @@ protected void EndSetResult() { CompleteTransit(SUCCEEDED_STATE); - Signal(); } @@ -78,8 +71,6 @@ if (BeginTransit()) { m_error = error; CompleteTransit(REJECTED_STATE); - - Signal(); } else { WaitTransition(); if (m_state == SUCCEEDED_STATE) @@ -87,58 +78,33 @@ } } - /// <summary> - /// Отменяет операцию, если это возможно. - /// </summary> - /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> - protected void SetCancelled(Exception reason) { - if (BeginTransit()) { - m_error = reason; - CompleteTransit(CANCELLED_STATE); - Signal(); - } - } - protected abstract void SignalHandler(THandler handler, int signal); void Signal() { - var hp = m_handlerPointer; - var slot = hp +1 ; - while (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { - SignalHandler(m_handlers[slot], m_state); - } - hp = m_handlerPointer; - slot = hp +1 ; - } - - - if (m_extraHandlers != null) { - THandler handler; - while (m_extraHandlers.TryDequeue(out handler)) - SignalHandler(handler, m_state); - } + THandler handler; + while (TryDequeueHandler(out handler)) + SignalHandler(handler, m_state); } #endregion - protected abstract Signal GetResolveSignal(); + protected abstract Signal GetFulfillSignal(); #region synchronization traits protected void WaitResult(int timeout) { - if (!(IsResolved || GetResolveSignal().Wait(timeout))) + if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) throw new TimeoutException(); - switch (m_state) { - case SUCCEEDED_STATE: - return; - case CANCELLED_STATE: - throw new OperationCanceledException("The operation has been cancelled", m_error); - case REJECTED_STATE: - throw new TargetInvocationException(m_error); - default: - throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); - } + if (IsRejected) + Rethrow(); + } + + protected void Rethrow() { + Debug.Assert(m_error != null); + if (m_error is OperationCanceledException) + throw new OperationCanceledException("Operation cancelled", m_error); + else + throw new TargetInvocationException(m_error); } #endregion @@ -150,149 +116,55 @@ // the promise is in the resolved state, just invoke the handler SignalHandler(handler, m_state); } else { - var slot = Interlocked.Increment(ref m_handlersCount) - 1; - - if (slot < RESERVED_HANDLERS_COUNT) { - - if (slot == 0) { - m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; - } else { - while (m_handlers == null) - Thread.MemoryBarrier(); - } - - m_handlers[slot] = handler; - - while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { - } + if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { + if (m_extraHandlers == null) + // compare-exchange will fprotect from loosing already created queue + Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); + m_extraHandlers.Enqueue(handler); + } - if (m_state > 1) { - do { - var hp = m_handlerPointer; - slot = hp + 1; - if (slot < m_handlersCommited) { - if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) - continue; - SignalHandler(m_handlers[slot], m_state); - } - break; - } while(true); - } - } else { - if (slot == RESERVED_HANDLERS_COUNT) { - m_extraHandlers = new SimpleAsyncQueue<THandler>(); - } else { - while (m_extraHandlers == null) - Thread.MemoryBarrier(); - } - - m_extraHandlers.Enqueue(handler); - - if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) + if (m_state > 1 && TryDequeueHandler(out handler)) // if the promise have been resolved while we was adding the handler to the queue // we can't guarantee that someone is still processing it // therefore we need to fetch a handler from the queue and execute it // note that fetched handler may be not the one that we have added // even we can fetch no handlers at all :) - SignalHandler(handler, m_state); - } + SignalHandler(handler, m_state); } + + } + + bool TryDequeueHandler(out THandler handler) { + handler = Interlocked.Exchange(ref m_handler, null); + if (handler != null) + return true; + return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler); } #endregion #region IPromise implementation - public bool IsResolved { + public bool IsFulfilled { get { - Thread.MemoryBarrier(); - return m_state > 1; + return m_state > TRANSITIONAL_STATE; } } - public bool IsCancelled { + public bool IsRejected { get { - Thread.MemoryBarrier(); - return m_state == CANCELLED_STATE; + return m_state == REJECTED_STATE; } } #endregion - public Exception Error { + public Exception RejectReason { get { return m_error; } } - public bool CancelOperationIfRequested() { - if (IsCancellationRequested) { - CancelOperation(CancellationReason); - return true; - } - return false; - } - - public virtual void CancelOperation(Exception reason) { - SetCancelled(reason); - } - - public void CancellationRequested(Action<Exception> handler) { - Safe.ArgumentNotNull(handler, "handler"); - if (IsCancellationRequested) - handler(CancellationReason); - - if (m_cancelationHandlers == null) - Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null); - - m_cancelationHandlers.Enqueue(handler); - - if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) - // TryDeque implies MemoryBarrier() - handler(m_cancelationReason); - } - - public bool IsCancellationRequested { - get { - do { - if (m_cancelRequest == CANCEL_NOT_REQUESTED) - return false; - if (m_cancelRequest == CANCEL_REQUESTED) - return true; - Thread.MemoryBarrier(); - } while(true); - } - } - - public Exception CancellationReason { - get { - do { - Thread.MemoryBarrier(); - } while(m_cancelRequest == CANCEL_REQUESTING); - - return m_cancelationReason; - } - } - - #region ICancellable implementation - - public void Cancel() { - Cancel(null); - } - - public void Cancel(Exception reason) { - if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { - m_cancelationReason = reason; - m_cancelRequest = CANCEL_REQUESTED; - if (m_cancelationHandlers != null) { - Action<Exception> handler; - while (m_cancelationHandlers.TryDequeue(out handler)) - handler(m_cancelationReason); - } - } - } - - #endregion } }
--- a/Implab/AbstractPromise.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/AbstractPromise.cs Wed Jan 24 03:03:21 2018 +0300 @@ -3,28 +3,16 @@ namespace Implab { public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise { - public struct HandlerDescriptor { + public class HandlerDescriptor { readonly Action m_handler; readonly Action<Exception> m_error; - readonly Action<Exception> m_cancel; - readonly PromiseEventType m_mask; - - public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) { + public HandlerDescriptor(Action success, Action<Exception> error) { m_handler = success; m_error = error; - m_cancel = cancel; - m_mask = PromiseEventType.Success; - } - - public HandlerDescriptor(Action handler, PromiseEventType mask) { - m_handler = handler; - m_error = null; - m_cancel = null; - m_mask = mask; } public void SignalSuccess() { - if ((m_mask & PromiseEventType.Success) != 0 && m_handler != null) { + if (m_handler != null) { try { m_handler(); // Analysis disable once EmptyGeneralCatchClause @@ -40,28 +28,6 @@ // Analysis disable once EmptyGeneralCatchClause } catch { } - } else if ((m_mask & PromiseEventType.Error ) != 0 && m_handler != null) { - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } - } - } - - public void SignalCancel(Exception reason) { - if (m_cancel != null) { - try { - m_cancel(reason); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } - } else if ( (m_mask & PromiseEventType.Cancelled) != 0 && m_handler != null) { - try { - m_handler(); - // Analysis disable once EmptyGeneralCatchClause - } catch { - } } } } @@ -75,48 +41,29 @@ handler.SignalSuccess(); break; case REJECTED_STATE: - handler.SignalError(Error); - break; - case CANCELLED_STATE: - handler.SignalCancel(CancellationReason); + handler.SignalError(RejectReason); break; default: throw new InvalidOperationException(String.Format("Invalid promise signal: {0}", signal)); } } - protected override Signal GetResolveSignal() { + protected override Signal GetFulfillSignal() { var signal = new Signal(); - On(signal.Set, PromiseEventType.All); + On(signal.Set, e => signal.Set()); return signal; } #endregion - public Type PromiseType { + public Type ResultType { get { return typeof(void); } } - public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) { - AddHandler(new HandlerDescriptor(success, error, cancel)); - return this; - } - - public IPromise On(Action success, Action<Exception> error) { - AddHandler(new HandlerDescriptor(success, error, null)); - return this; - } - - public IPromise On(Action success) { - AddHandler(new HandlerDescriptor(success, null, null)); - return this; - } - - public IPromise On(Action handler, PromiseEventType events) { - AddHandler(new HandlerDescriptor(handler,events)); - return this; + public void On(Action success, Action<Exception> error) { + AddHandler(new HandlerDescriptor(success, error)); } public IPromise<T> Cast<T>() {
--- a/Implab/AbstractPromiseT.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/AbstractPromiseT.cs Wed Jan 24 03:03:21 2018 +0300 @@ -84,7 +84,7 @@ } } - public Type PromiseType { + public Type ResultType { get { return typeof(T); } @@ -167,7 +167,7 @@ #region implemented abstract members of AbstractPromise - protected override Signal GetResolveSignal() { + protected override Signal GetFulfillSignal() { var signal = new Signal(); AddHandler(new HandlerDescriptor(signal.Set, PromiseEventType.All)); return signal; @@ -179,7 +179,7 @@ handler.SignalSuccess(m_result); break; case REJECTED_STATE: - handler.SignalError(Error); + handler.SignalError(RejectReason); break; case CANCELLED_STATE: handler.SignalCancel(CancellationReason);
--- a/Implab/FailedPromise.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/FailedPromise.cs Wed Jan 24 03:03:21 2018 +0300 @@ -53,20 +53,20 @@ } public void Join() { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); } public void Join(int timeout) { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); } - public virtual Type PromiseType { + public virtual Type ResultType { get { return typeof(void); } } - public bool IsResolved { + public bool IsFulfilled { get { return true; } @@ -78,7 +78,7 @@ } } - public Exception Error { + public Exception RejectReason { get { return m_error; }
--- a/Implab/FailedPromiseT.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/FailedPromiseT.cs Wed Jan 24 03:03:21 2018 +0300 @@ -9,7 +9,7 @@ public IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel) { if (error != null) { try { - error(Error); + error(RejectReason); // Analysis disable once EmptyGeneralCatchClause } catch { } @@ -20,7 +20,7 @@ public IPromise<T> On(Action<T> success, Action<Exception> error) { if (error != null) { try { - error(Error); + error(RejectReason); // Analysis disable once EmptyGeneralCatchClause } catch { } @@ -33,11 +33,11 @@ } T IPromise<T>.Join() { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); } T IPromise<T>.Join(int timeout) { - throw new TargetInvocationException(Error); + throw new TargetInvocationException(RejectReason); }
--- a/Implab/IPromise.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/IPromise.cs Wed Jan 24 03:03:21 2018 +0300 @@ -9,42 +9,29 @@ /// <summary> /// Тип результата, получаемого через данное обещание. /// </summary> - Type PromiseType { get; } + Type ResultType { get; } /// <summary> /// Обещание является выполненым, либо успешно, либо с ошибкой, либо отменено. /// </summary> - bool IsResolved { get; } + bool IsFulfilled { get; } - /// <summary> - /// Обещание было отменено. - /// </summary> - bool IsCancelled { get; } + bool IsRejected { get; } + + bool IsResolved { get; } /// <summary> /// Исключение возникшее в результате выполнения обещания, либо причина отмены. /// </summary> - Exception Error { get; } + Exception RejectReason { get; } /// <summary> /// Adds specified listeners to the current promise. /// </summary> /// <param name="success">The handler called on the successful promise completion.</param> /// <param name="error">The handler is called if an error while completing the promise occurred.</param> - /// <param name="cancel">The handler is called in case of promise cancellation.</param> /// <returns>The current promise.</returns> - IPromise On(Action success, Action<Exception> error, Action<Exception> cancel); - IPromise On(Action success, Action<Exception> error); - IPromise On(Action success); - - /// <summary> - /// Adds specified listeners to the current promise. - /// </summary> - /// <param name="handler">The handler called on the specified events.</param> - /// <param name = "events">The combination of flags denoting the events for which the - /// handler shoud be called.</param> - /// <returns>The current promise.</returns> - IPromise On(Action handler, PromiseEventType events); + void On(Action success, Action<Exception> error); /// <summary> /// Преобразует результат обещания к заданному типу и возвращает новое обещание.
--- a/Implab/IPromiseT.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/IPromiseT.cs Wed Jan 24 03:03:21 2018 +0300 @@ -3,23 +3,10 @@ namespace Implab { public interface IPromise<out T> : IPromise { - IPromise<T> On(Action<T> success, Action<Exception> error, Action<Exception> cancel); - - IPromise<T> On(Action<T> success, Action<Exception> error); - - IPromise<T> On(Action<T> success); + void On(Action<T> success, Action<Exception> error); new T Join(); new T Join(int timeout); - - new IPromise<T> On(Action success, Action<Exception> error, Action<Exception> cancel); - - new IPromise<T> On(Action success, Action<Exception> error); - - new IPromise<T> On(Action success); - - new IPromise<T> On(Action handler, PromiseEventType events); - } }
--- a/Implab/Parallels/ArrayTraits.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/Parallels/ArrayTraits.cs Wed Jan 24 03:03:21 2018 +0300 @@ -164,7 +164,7 @@ // Analysis disable AccessToDisposedClosure AsyncPool.RunThread<int>(() => { for (int i = 0; i < source.Length; i++) { - if(promise.IsResolved) + if(promise.IsFulfilled) break; // stop processing in case of error or cancellation var idx = i;
--- a/Implab/Parallels/SimpleAsyncQueue.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/Parallels/SimpleAsyncQueue.cs Wed Jan 24 03:03:21 2018 +0300 @@ -15,12 +15,12 @@ // the reader and the writer are mainteined completely independent, // the reader can read next item when m_first.next is not null - // the writer creates the a new node, moves m_last to this node and + // the writer creates a new node, moves m_last to this node and // only after that restores the reference from the previous node - // making available the reader to read the new node. + // making the reader be able to read the new node. - Node m_first; // position on the node which is already read - Node m_last; // position on the node which is already written + volatile Node m_first; // position on the node which is already read + volatile Node m_last; // position on the node which is already written public SimpleAsyncQueue() { m_first = m_last = new Node(default(T)); @@ -35,29 +35,38 @@ // release-fence last.next = next; - + } public bool TryDequeue(out T value) { - Node first; - Node next; + Node first = m_first; ; + Node next = first.next; ; + + if (next == null) { + value = default(T); + return false; + } + + var first2 = Interlocked.CompareExchange(ref m_first, next, first); + + if (first != first2) { + // head is updated by someone else - Thread.MemoryBarrier(); // ensure m_first is fresh - SpinWait spin = new SpinWait(); - do { - first = m_first; - // aquire-fence - next = first.next; - if (next == null) { - value = default(T); - return false; - } - - if (first == Interlocked.CompareExchange(ref m_first, next, first)) - // head succesfully updated - break; - spin.SpinOnce(); - } while (true); + SpinWait spin = new SpinWait(); + do { + first = first2; + next = first.next; + if (next == null) { + value = default(T); + return false; + } + + first2 = Interlocked.CompareExchange(ref m_first, next, first); + if (first == first2) + break; + spin.SpinOnce(); + } while (true); + } value = next.value; return true;
--- a/Implab/PromiseAwaiter.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/PromiseAwaiter.cs Wed Jan 24 03:03:21 2018 +0300 @@ -20,7 +20,7 @@ public bool IsCompleted { get { - return m_promise.IsResolved; + return m_promise.IsFulfilled; } } }
--- a/Implab/PromiseAwaiterT.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/PromiseAwaiterT.cs Wed Jan 24 03:03:21 2018 +0300 @@ -20,7 +20,7 @@ public bool IsCompleted { get { - return m_promise.IsResolved; + return m_promise.IsFulfilled; } } }
--- a/Implab/SuccessPromise.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/SuccessPromise.cs Wed Jan 24 03:03:21 2018 +0300 @@ -58,13 +58,13 @@ public void Join(int timeout) { } - public Type PromiseType { + public Type ResultType { get { return typeof(void); } } - public bool IsResolved { + public bool IsFulfilled { get { return true; } @@ -76,7 +76,7 @@ } } - public Exception Error { + public Exception RejectReason { get { return null; }
--- a/Implab/SuccessPromiseT.cs Tue Jan 23 19:39:21 2018 +0300 +++ b/Implab/SuccessPromiseT.cs Wed Jan 24 03:03:21 2018 +0300 @@ -119,13 +119,13 @@ void IPromise.Join(int timeout) { } - public Type PromiseType { + public Type ResultType { get { return typeof(T); } } - public bool IsResolved { + public bool IsFulfilled { get { return true; } @@ -137,7 +137,7 @@ } } - public Exception Error { + public Exception RejectReason { get { return null; }
--- a/packages/repositories.config Tue Jan 23 19:39:21 2018 +0300 +++ b/packages/repositories.config Wed Jan 24 03:03:21 2018 +0300 @@ -1,5 +1,6 @@ <?xml version="1.0" encoding="utf-8"?> <repositories> <repository path="../Implab.Test/Implab.Format.Test/packages.config" /> + <repository path="../Implab.Test/packages.config" /> <repository path="../MonoPlay/packages.config" /> </repositories> \ No newline at end of file