Mercurial > pub > ImplabNet
changeset 76:c761fc982e1d v2
Refactoring of the IPromise<T> interface
Added tests
author | cin |
---|---|
date | Wed, 10 Sep 2014 17:53:05 +0400 |
parents | 4439140706d0 |
children | 91362ffbecf8 |
files | Implab.Fx/PromiseHelpers.cs Implab.Fx/Properties/AssemblyInfo.cs Implab.Test/AsyncTests.cs Implab/Diagnostics/TraceContext.cs Implab/ICancellable.cs Implab/IPromise.cs Implab/IPromiseT.cs Implab/Parallels/ArrayTraits.cs Implab/Promise.cs Implab/PromiseExtensions.cs Implab/Properties/AssemblyInfo.cs Implab/TaskController.cs |
diffstat | 12 files changed, 233 insertions(+), 195 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Fx/PromiseHelpers.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab.Fx/PromiseHelpers.cs Wed Sep 10 17:53:05 2014 +0400 @@ -17,71 +17,25 @@ /// <example> /// client /// .Get("description.txt") // returns a promise - /// .DirectToControl(m_ctl) // handle the promise in the thread of the control + /// .DispatchToControl(m_ctl) // handle the promise in the thread of the control /// .Then( /// description => m_ctl.Text = description // now it's safe /// ) /// </example> - public static Promise<T> DispatchToControl<T>(this Promise<T> that, Control ctl) + public static IPromise<T> DispatchToControl<T>(this IPromise<T> that, Control ctl) { - if (that == null) - throw new ArgumentNullException("that"); - if (ctl == null) - throw new ArgumentNullException("ctl"); + Safe.ArgumentNotNull(that, "that"); + Safe.ArgumentNotNull(ctl, "ctl"); var directed = new ControlBoundPromise<T>(ctl,that,true); - that.Then( + that.Last( directed.Resolve, - err => - { - directed.Reject(err); - return default(T); - } + directed.Reject, + directed.Cancel ); return directed; } - - /// <summary> - /// Направляет обработку обещания в текущий поток, если у него существует контекст синхронизации. - /// </summary> - /// <typeparam name="T">Тип результата обещания.</typeparam> - /// <param name="that">Обещание которое нужно обработать в текущем потоке.</param> - /// <returns>Перенаправленное обещание.</returns> - public static Promise<T> DispatchToCurrentThread<T>(this Promise<T> that) - { - var sync = SynchronizationContext.Current; - if (sync == null) - throw new InvalidOperationException("The current thread doesn't have a syncronization context"); - return DispatchToSyncContext(that, sync); - } - - /// <summary> - /// Направляет обработку обещания в указанный контекст синхронизации. - /// </summary> - /// <typeparam name="T">Тип результата обещания.</typeparam> - /// <param name="that">Обещание, которое требуется обработать в указанном контексте синхронизации.</param> - /// <param name="sync">Контекст синхронизации в который будет направлено обещание.</param> - /// <returns>Новое обещание, которое будет обрабатываться в указанном контексте.</returns> - public static Promise<T> DispatchToSyncContext<T>(this Promise<T> that, SynchronizationContext sync) - { - if (that == null) - throw new ArgumentNullException("that"); - if (sync == null) - throw new ArgumentNullException("sync"); - - var d = new Promise<T>(); - - that.Then( - res => sync.Post(state => d.Resolve(res), null), - err => { - sync.Post(state => d.Reject(err), null); - return default(T); - } - ); - - return d; - } } }
--- a/Implab.Fx/Properties/AssemblyInfo.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab.Fx/Properties/AssemblyInfo.cs Wed Sep 10 17:53:05 2014 +0400 @@ -32,5 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.0.0")] -[assembly: AssemblyFileVersion("1.0.0.0")] +[assembly: AssemblyVersion("2.0.*")]
--- a/Implab.Test/AsyncTests.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab.Test/AsyncTests.cs Wed Sep 10 17:53:05 2014 +0400 @@ -38,6 +38,38 @@ } [TestMethod] + public void CancelExceptionTest() { + var p = new Promise<bool>(); + p.Cancel(); + + var p2 = p.Cancelled(() => { + throw new ApplicationException("CANCELLED"); + }); + + try { + p2.Join(); + Assert.Fail(); + } catch (ApplicationException err) { + Assert.AreEqual("CANCELLED", err.InnerException.Message); + } + + } + + [TestMethod] + public void ContinueOnCancelTest() { + var p = new Promise<bool>(); + p.Cancel(); + + var p2 = p + .Cancelled(() => { + throw new ApplicationException("CANCELLED"); + }) + .Error(e => true); + + Assert.AreEqual(true, p2.Join()); + } + + [TestMethod] public void JoinSuccessTest() { var p = new Promise<int>(); p.Resolve(100); @@ -63,7 +95,7 @@ public void MapTest() { var p = new Promise<int>(); - var p2 = p.Map(x => x.ToString()); + var p2 = p.Then(x => x.ToString()); p.Resolve(100); Assert.AreEqual(p2.Join(), "100"); @@ -185,8 +217,8 @@ var stop = new ManualResetEvent(false); int total = 0; - int itemsPerWriter = 1000; - int writersCount = 3; + int itemsPerWriter = 10000; + int writersCount = 10; for (int i = 0; i < writersCount; i++) { Interlocked.Increment(ref writers); @@ -318,7 +350,7 @@ .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) - .Map(y => y) + .Then(y => y) .Cancelled(() => flags[1] = true) ) .Cancelled(() => flags[2] = true); @@ -341,7 +373,7 @@ // OperationCanceledException var p = PromiseHelper .Sleep(1, "Hi, HAL!") - .Chain(x => { + .Then(x => { // var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); // @@ -360,16 +392,15 @@ [TestMethod] public void ChainedCancel2Test() { // , - IPromise p = null; var pSurvive = new Promise<bool>(); var hemStarted = new ManualResetEvent(false); - p = PromiseHelper + var p = PromiseHelper .Sleep(1, "Hi, HAL!") .Chain(x => { hemStarted.Set(); // var result = PromiseHelper - .Sleep(1000, "HEM ENABLED!!!") + .Sleep(10000, "HEM ENABLED!!!") .Then(s => pSurvive.Resolve(false)); result
--- a/Implab/Diagnostics/TraceContext.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/Diagnostics/TraceContext.cs Wed Sep 10 17:53:05 2014 +0400 @@ -215,7 +215,7 @@ Safe.ArgumentNotNull(promise, "promise"); var ctx = DetachLogicalOperation(); - promise.Finally(() => { + promise.Anyway(() => { var old = _current; TraceContext.Attach(ctx); TraceContext.Current.EndLogicalOperation();
--- a/Implab/ICancellable.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/ICancellable.cs Wed Sep 10 17:53:05 2014 +0400 @@ -5,6 +5,6 @@ namespace Implab { public interface ICancellable { - bool Cancel(); + void Cancel(); } }
--- a/Implab/IPromise.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/IPromise.cs Wed Sep 10 17:53:05 2014 +0400 @@ -52,7 +52,7 @@ /// </summary> /// <param name="handler">Обработчик.</param> /// <remarks>После обработке ошибки, она передается дальше.</remarks> - IPromise Finally(Action handler); + IPromise Anyway(Action handler); /// <summary> /// Обработчик для регистрации отмены обещания, событие отмены не может быть подавлено. /// </summary>
--- a/Implab/IPromiseT.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/IPromiseT.cs Wed Sep 10 17:53:05 2014 +0400 @@ -4,34 +4,40 @@ using System.Text; namespace Implab { - public interface IPromise<T>: IPromise { + public interface IPromise<T> : IPromise { new T Join(); new T Join(int timeout); + void Last(ResultHandler<T> success, ErrorHandler error, Action cancel); + + void Last(ResultHandler<T> success, ErrorHandler error); + + void Last(ResultHandler<T> success); + IPromise<T> Then(ResultHandler<T> success, ErrorHandler<T> error, Action cancel); IPromise<T> Then(ResultHandler<T> success, ErrorHandler<T> error); IPromise<T> Then(ResultHandler<T> success); - void Last(ResultHandler<T> success, ErrorHandler error, Action cancel); - void Last(ResultHandler<T> success, ErrorHandler error); - void Last(ResultHandler<T> success); + IPromise<T2> Then<T2>(ResultMapper<T, T2> mapper, ErrorHandler<T2> error, Action cancel); + + IPromise<T2> Then<T2>(ResultMapper<T, T2> mapper, ErrorHandler<T2> error); + + IPromise<T2> Then<T2>(ResultMapper<T, T2> mapper); + + IPromise<T2> Chain<T2>(ResultMapper<T, IPromise<T2>> chained, ErrorHandler<IPromise<T2>> error, Action cancel); + + IPromise<T2> Chain<T2>(ResultMapper<T, IPromise<T2>> chained, ErrorHandler<IPromise<T2>> error); + + IPromise<T2> Chain<T2>(ResultMapper<T, IPromise<T2>> chained); IPromise<T> Error(ErrorHandler<T> error); - IPromise<T2> Then<T2>(ResultMapper<T,T2> mapper, ErrorHandler<T> error); - - IPromise<T2> Then<T2>(ResultMapper<T,T2> mapper); - - IPromise<T2> Then<T2>(ChainedOperation<T, T2> chained, ErrorHandler<T> error); - - IPromise<T2> Then<T2>(ChainedOperation<T, T2> chained); - new IPromise<T> Cancelled(Action handler); - new IPromise<T> Finally(Action handler); + new IPromise<T> Anyway(Action handler); } }
--- a/Implab/Parallels/ArrayTraits.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Wed Sep 10 17:53:05 2014 +0400 @@ -29,7 +29,7 @@ m_pending = source.Length; m_action = action; - m_promise.Finally(Dispose); + m_promise.Anyway(Dispose); InitPool(); } @@ -85,7 +85,7 @@ m_transform = transform; m_traceContext = TraceContext.Snapshot(); - m_promise.Finally(Dispose); + m_promise.Anyway(Dispose); InitPool(); } @@ -138,7 +138,7 @@ return iter.Promise; } - public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { + public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) { if (source == null) throw new ArgumentNullException("source"); if (transform == null) @@ -165,7 +165,7 @@ semaphore.WaitOne(); try { var p1 = transform(source[i]); - p1.Finally(() => semaphore.Release()); + p1.Anyway(() => semaphore.Release()); p1.Then( x => { res[idx] = x; @@ -186,7 +186,7 @@ return 0; }); - return promise.Finally(semaphore.Dispose); + return promise.Anyway(semaphore.Dispose); } }
--- a/Implab/Promise.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/Promise.cs Wed Sep 10 17:53:05 2014 +0400 @@ -11,7 +11,6 @@ public delegate T ErrorHandler<out T>(Exception e); public delegate void ResultHandler<in T>(T result); public delegate TNew ResultMapper<in TSrc,out TNew>(TSrc result); - public delegate IPromise<TNew> ChainedOperation<in TSrc,TNew>(TSrc result); /// <summary> /// Класс для асинхронного получения результатов. Так называемое "обещание". @@ -121,10 +120,15 @@ public Promise(IPromise parent, bool cancellable) { m_cancellable = cancellable; if (parent != null) - Cancelled(() => { - if (parent.IsExclusive) - parent.Cancel(); - }); + AddHandler( + null, + null, + () => { + if (parent.IsExclusive) + parent.Cancel(); + }, + null + ); } bool BeginTransit() { @@ -210,22 +214,14 @@ /// <summary> /// Отменяет операцию, если это возможно. /// </summary> - /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns> - public bool Cancel() { + /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> + public void Cancel() { if (m_cancellable && BeginTransit()) { CompleteTransit(CANCELLED_STATE); OnStateChanged(); - return true; } - return false; } - // сделано для возвращаемого типа void - protected void InternalCancel() { - Cancel(); - } - - public IPromise<T> Then(ResultHandler<T> success, ErrorHandler<T> error, Action cancel) { if (success == null && error == null && cancel == null) return this; @@ -255,30 +251,7 @@ return medium; } - public IPromise Then(Action success, ErrorHandler error, Action cancel) { - return Then( - x => success(), - e => { - error(e); - return default(T); - }, - cancel - ); - } - - public IPromise Then(Action success, ErrorHandler error) { - return Then( - x => success(), - e => { - error(e); - return default(T); - } - ); - } - - public IPromise Then(Action success) { - return Then(x => success()); - } + public IPromise<T> Then(ResultHandler<T> success) { @@ -292,6 +265,23 @@ return medium; } + /// <summary> + /// Последний обработчик в цепочки обещаний. + /// </summary> + /// <param name="success"></param> + /// <param name="error"></param> + /// <param name="cancel"></param> + /// <remarks> + /// <para> + /// Данный метод не создает связанного с текущим обещания и предназначен для окончания + /// фсинхронной цепочки. + /// </para> + /// <para> + /// Если данный метод вызвать несколько раз, либо добавить другие обработчики, то цепочка + /// не будет одиночной <see cref="IsExclusive"/> и, как следствие, будет невозможна отмена + /// всей цепи обещаний снизу (с самого последнего обещания). + /// </para> + /// </remarks> public void Last(ResultHandler<T> success, ErrorHandler error, Action cancel) { if (success == null && error == null && cancel == null) return; @@ -313,18 +303,6 @@ Last(success, null, null); } - public void Last(Action success,ErrorHandler error, Action cancel) { - Last(x => success(), error, cancel); - } - - public void Last(Action success,ErrorHandler error) { - Last(x => success(), error, null); - } - - public void Last(Action success) { - Last(x => success(), null, null); - } - public IPromise Error(ErrorHandler error) { if (error == null) return this; @@ -371,44 +349,56 @@ /// <param name="error">Обработчик ошибки. Данный обработчик получит /// исключение возникшее при выполнении операции.</param> /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns> - public IPromise<TNew> Then<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler<T> error) { - if (mapper == null) - throw new ArgumentNullException("mapper"); + public IPromise<TNew> Then<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler<TNew> error, Action cancel) { + Safe.ArgumentNotNull(mapper, "mapper"); + + // создаем прицепленное обещание + var medium = new Promise<TNew>(this, true); - // создаем прицепленное обещание - var chained = new Promise<TNew>(this, true); - - ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result)); + ResultHandler<T> resultHandler = result => medium.Resolve(mapper(result)); ErrorHandler<T> errorHandler; if (error != null) errorHandler = e => { try { - return error(e); + medium.Resolve(error(e)); } catch (Exception e2) { // в случае ошибки нужно передать исключение дальше по цепочке - chained.Reject(e2); + medium.Reject(e2); } return default(T); }; else errorHandler = e => { - chained.Reject(e); + medium.Reject(e); return default(T); }; + Action cancelHandler; + if (cancel != null) + cancelHandler = () => { + cancel(); + medium.Cancel(); + }; + else + cancelHandler = medium.Cancel; + AddHandler( resultHandler, errorHandler, - chained.InternalCancel, + cancelHandler, null ); - return chained; + return medium; + } + + public IPromise<TNew> Then<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler<TNew> error) { + return Then(mapper, error, null); } public IPromise<TNew> Then<TNew>(ResultMapper<T, TNew> mapper) { - return Then(mapper, null); + return Then(mapper, null, null); } /// <summary> @@ -421,7 +411,9 @@ /// <param name="error">Обработчик ошибки. Данный обработчик получит /// исключение возникшее при выполнении текуещй операции.</param> /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns> - public IPromise<TNew> Then<TNew>(ChainedOperation<T, TNew> chained, ErrorHandler<T> error) { + public IPromise<TNew> Chain<TNew>(ResultMapper<T, IPromise<TNew>> chained, ErrorHandler<IPromise<TNew>> error, Action cancel) { + + Safe.ArgumentNotNull(chained, "chained"); // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно // создать посредника, к которому будут подвызяваться следующие обработчики. @@ -435,12 +427,10 @@ var promise = chained(result); - promise.Then( + promise.Last( medium.Resolve, - err => { - medium.Reject(err); - throw new TransientPromiseException(err); - } + medium.Reject, + () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка ); // notify chained operation that it's not needed anymore @@ -450,41 +440,70 @@ if (promise.IsExclusive) promise.Cancel(); }); - - // внешняя отмена связанной операции рассматривается как ошибка - promise.Cancelled(() => medium.Reject(new OperationCanceledException())); }; - ErrorHandler<T> errorHandler = delegate(Exception e) { - if (error != null) { + ErrorHandler<T> errorHandler; + + if (error != null) + errorHandler = delegate(Exception e) { try { - return error(e); + var promise = error(e); + + promise.Last( + medium.Resolve, + medium.Reject, + () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка + ); + + // notify chained operation that it's not needed anymore + // порядок вызова Then, Cancelled важен, поскольку от этого + // зависит IsExclusive + medium.Cancelled(() => { + if (promise.IsExclusive) + promise.Cancel(); + }); } catch (Exception e2) { medium.Reject(e2); - return default(T); } - } - // в случае ошибки нужно передать исключение дальше по цепочке - medium.Reject(e); - return default(T); - }; + return default(T); + }; + else + errorHandler = err => { + medium.Reject(err); + return default(T); + }; + + + Action cancelHandler; + if (cancel != null) + cancelHandler = () => { + if (cancel != null) + cancel(); + medium.Cancel(); + }; + else + cancelHandler = medium.Cancel; AddHandler( resultHandler, errorHandler, - medium.InternalCancel, + cancelHandler, null ); return medium; } - public IPromise<TNew> Then<TNew>(ChainedOperation<T, TNew> chained) { - return Then(chained, null); + public IPromise<TNew> Chain<TNew>(ResultMapper<T, IPromise<TNew>> chained, ErrorHandler<IPromise<TNew>> error) { + return Chain(chained, error, null); + } + + public IPromise<TNew> Chain<TNew>(ResultMapper<T, IPromise<TNew>> chained) { + return Chain(chained, null, null); } public IPromise<T> Cancelled(Action handler) { - var medium = new Promise<T>(this, true); + var medium = new Promise<T>(this,true); AddHandler(null, null, handler, medium); return medium; } @@ -494,9 +513,9 @@ /// </summary> /// <param name="handler">The handler that will be called anyway</param> /// <returns>self</returns> - public IPromise<T> Finally(Action handler) { - if (handler == null) - throw new ArgumentNullException("handler"); + public IPromise<T> Anyway(Action handler) { + Safe.ArgumentNotNull(handler, "handler"); + AddHandler( x => handler(), e => { @@ -541,7 +560,7 @@ /// <returns>Результат выполнения обещания</returns> public T Join(int timeout) { var evt = new ManualResetEvent(false); - Finally(() => evt.Set()); + Anyway(() => evt.Set()); if (!evt.WaitOne(timeout, true)) throw new TimeoutException(); @@ -736,12 +755,49 @@ #region IPromiseBase explicit implementation + IPromise IPromise.Then(Action success, ErrorHandler error, Action cancel) { + return Then( + x => success(), + e => { + error(e); + return default(T); + }, + cancel + ); + } + + IPromise IPromise.Then(Action success, ErrorHandler error) { + return Then( + x => success(), + e => { + error(e); + return default(T); + } + ); + } + + IPromise IPromise.Then(Action success) { + return Then(x => success()); + } + + void IPromise.Last(Action success, ErrorHandler error, Action cancel) { + Last(x => success(), error, cancel); + } + + void IPromise.Last(Action success, ErrorHandler error) { + Last(x => success(), error, null); + } + + void IPromise.Last(Action success) { + Last(x => success(), null, null); + } + IPromise IPromise.Error(ErrorHandler error) { return Error(error); } - IPromise IPromise.Finally(Action handler) { - return Finally(handler); + IPromise IPromise.Anyway(Action handler) { + return Anyway(handler); } IPromise IPromise.Cancelled(Action handler) {
--- a/Implab/PromiseExtensions.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/PromiseExtensions.cs Wed Sep 10 17:53:05 2014 +0400 @@ -14,12 +14,10 @@ var p = new SyncContextPromise<T>(context, that, true); - that.Then( - x => p.Resolve(x), - e => { - p.Reject(e); - return default(T); - } + that.Last( + p.Resolve, + p.Reject, + p.Cancel ); return p; } @@ -30,12 +28,10 @@ var p = new SyncContextPromise<T>(context, that, true); - that.Then( - x => p.Resolve(x), - e => { - p.Reject(e); - return default(T); - } + that.Last( + p.Resolve, + p.Reject, + p.Cancel ); return p; }
--- a/Implab/Properties/AssemblyInfo.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/Properties/AssemblyInfo.cs Wed Sep 10 17:53:05 2014 +0400 @@ -16,7 +16,7 @@ // The form "{Major}.{Minor}.*" will automatically update the build and revision, // and "{Major}.{Minor}.{Build}.*" will update just the revision. -[assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("2.0.*")] [assembly: ComVisible(false)] // The following attributes are used to specify the signing key for the assembly,
--- a/Implab/TaskController.cs Wed Sep 10 11:17:37 2014 +0400 +++ b/Implab/TaskController.cs Wed Sep 10 17:53:05 2014 +0400 @@ -92,14 +92,10 @@ } } - public bool Cancel() { + public void Cancel() { lock (m_lock) { - if (!m_cancelled) { + if (!m_cancelled) m_cancelled = true; - return true; - } else { - return false; - } } }