Mercurial > pub > ImplabNet
changeset 106:d4e38929ce36 v2
promises refactoring
author | cin |
---|---|
date | Mon, 10 Nov 2014 18:00:28 +0300 |
parents | 4d308952fd5e |
children | f5220e5472ef |
files | Implab.Fx/AnimationHelpers.cs Implab.Fx/ControlBoundPromise.cs Implab.Test/AsyncTests.cs Implab/IPromiseT.cs Implab/Implab.csproj Implab/Parallels/MTCustomQueue.cs Implab/Parallels/MTCustomQueueNode.cs Implab/Promise.cs Implab/SyncContextPromise.cs |
diffstat | 9 files changed, 252 insertions(+), 144 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Fx/AnimationHelpers.cs Mon Nov 10 10:17:54 2014 +0300 +++ b/Implab.Fx/AnimationHelpers.cs Mon Nov 10 18:00:28 2014 +0300 @@ -42,7 +42,13 @@ { var anim = ctl.AnimateTransparency(0); - return anim.Play().DispatchToControl(ctl).Then(frm => frm.Close()); + return anim + .Play() + .DispatchToControl(ctl) + .Then(frm => { + frm.Close(); + return frm; + }); } public static IPromise<T> OverlayFadeIn<T>(this Form that, T overlay) where T : Form
--- a/Implab.Fx/ControlBoundPromise.cs Mon Nov 10 10:17:54 2014 +0300 +++ b/Implab.Fx/ControlBoundPromise.cs Mon Nov 10 18:00:28 2014 +0300 @@ -19,9 +19,9 @@ m_target = target; } - protected override void InvokeHandler(HandlerDescriptor handler) { + protected override void InvokeHandler(AbstractHandler handler) { if (m_target.InvokeRequired) - m_target.BeginInvoke(new Action<HandlerDescriptor>(base.InvokeHandler), handler); + m_target.BeginInvoke(new Action<AbstractHandler>(base.InvokeHandler), handler); else base.InvokeHandler(handler); }
--- a/Implab.Test/AsyncTests.cs Mon Nov 10 10:17:54 2014 +0300 +++ b/Implab.Test/AsyncTests.cs Mon Nov 10 18:00:28 2014 +0300 @@ -426,8 +426,11 @@ hemStarted.Set(); // запускаем две асинхронные операции var result = PromiseHelper - .Sleep(10000, "HEM ENABLED!!!") - .Then(s => pSurvive.Resolve(false)); + .Sleep(100000000, "HEM ENABLED!!!") + .Then(s => { + pSurvive.Resolve(false); + return s; + }); result .Cancelled(() => pSurvive.Resolve(true));
--- a/Implab/IPromiseT.cs Mon Nov 10 10:17:54 2014 +0300 +++ b/Implab/IPromiseT.cs Mon Nov 10 18:00:28 2014 +0300 @@ -13,12 +13,6 @@ void On(Action<T> success); - IPromise<T> Then(Action<T> success, Func<Exception,T> error, Action cancel); - - IPromise<T> Then(Action<T> success, Func<Exception,T> error); - - IPromise<T> Then(Action<T> success); - IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Action cancel); IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error);
--- a/Implab/Implab.csproj Mon Nov 10 10:17:54 2014 +0300 +++ b/Implab/Implab.csproj Mon Nov 10 18:00:28 2014 +0300 @@ -148,6 +148,8 @@ <Compile Include="IComponentContainer.cs" /> <Compile Include="MTComponentContainer.cs" /> <Compile Include="PromiseEventType.cs" /> + <Compile Include="Parallels\MTCustomQueue.cs" /> + <Compile Include="Parallels\MTCustomQueueNode.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup />
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/MTCustomQueue.cs Mon Nov 10 18:00:28 2014 +0300 @@ -0,0 +1,135 @@ +using System; +using System.Collections.Generic; +using System.Threading; +using System.Collections; + +namespace Implab.Parallels { + public class MTCustomQueue<TNode> : IEnumerable<TNode> where TNode : MTCustomQueueNode<TNode> { + TNode m_first; + TNode m_last; + + public void Enqueue(TNode next) { + Thread.MemoryBarrier(); + + var last = m_last; + + // Interlocaked.CompareExchange implies Thread.MemoryBarrier(); + // to ensure that the next node is completely constructed + while (last != Interlocked.CompareExchange(ref m_last, next, last)) + last = m_last; + + if (last != null) + last.next = next; + else + m_first = next; + } + + public bool TryDequeue(out TNode node) { + TNode first; + TNode next; + node = null; + + Thread.MemoryBarrier(); + do { + first = m_first; + if (first == null) + return false; + next = first.next; + if (next == null) { + // this is the last element, + // then try to update the tail + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // this is the race condition + if (m_last == null) + // the queue is empty + return false; + // tail has been changed, we need to restart + continue; + } + + // tail succesfully updated and first.next will never be changed + // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null + // however the parallel writer may update the m_first since the m_last is null + + // so we need to fix inconsistency by setting m_first to null or if it has been + // updated by the writer already then we should just to give up + Interlocked.CompareExchange(ref m_first, null, first); + break; + + } + if (first == Interlocked.CompareExchange(ref m_first, next, first)) + // head succesfully updated + break; + } while (true); + + node = first; + return true; + } + + #region IEnumerable implementation + + class Enumerator : IEnumerator<TNode> { + TNode m_current; + TNode m_first; + + public Enumerator(TNode first) { + m_first = first; + } + + #region IEnumerator implementation + + public bool MoveNext() { + m_current = m_current == null ? m_first : m_current.next; + return m_current != null; + } + + public void Reset() { + m_current = null; + } + + object IEnumerator.Current { + get { + if (m_current == null) + throw new InvalidOperationException(); + return m_current; + } + } + + #endregion + + #region IDisposable implementation + + public void Dispose() { + } + + #endregion + + #region IEnumerator implementation + + public TNode Current { + get { + if (m_current == null) + throw new InvalidOperationException(); + return m_current; + } + } + + #endregion + } + + public IEnumerator<TNode> GetEnumerator() { + return new Enumerator(m_first); + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/MTCustomQueueNode.cs Mon Nov 10 18:00:28 2014 +0300 @@ -0,0 +1,6 @@ +namespace Implab.Parallels { + public class MTCustomQueueNode<TNode> where TNode : MTCustomQueueNode<TNode> { + public TNode next; + } +} +
--- a/Implab/Promise.cs Mon Nov 10 10:17:54 2014 +0300 +++ b/Implab/Promise.cs Mon Nov 10 18:00:28 2014 +0300 @@ -42,53 +42,68 @@ /// </remarks> public class Promise<T> : IPromise<T> { - protected struct HandlerDescriptor { - public Action<T> resultHandler; - public Func<Exception,T> errorHandler; - public Action cancellHandler; - public Promise<T> medium; + protected abstract class AbstractHandler : MTCustomQueueNode<AbstractHandler> { + public abstract void Resolve(T result); + public abstract void Reject(Exception error); + public abstract void Cancel(); + } + + protected class HandlerDescriptor<T2> : AbstractHandler { + + readonly Func<T,T2> m_resultHandler; + readonly Func<Exception,T2> m_errorHandler; + readonly Action m_cancellHandler; + readonly Promise<T2> m_medium; - public void Resolve(T result) { - if (resultHandler != null) { + public HandlerDescriptor(Func<T,T2> resultHandler, Func<Exception,T2> errorHandler, Action cancelHandler, Promise<T2> medium) { + m_resultHandler = resultHandler; + m_errorHandler = errorHandler; + m_cancellHandler = cancelHandler; + m_medium = medium; + } + + public override void Resolve(T result) { + if (m_resultHandler != null) { try { - resultHandler(result); + if (m_medium != null) + m_medium.Resolve(m_resultHandler(result)); + else + m_resultHandler(result); } catch (Exception e) { Reject(e); - return; } - } - if (medium != null) - medium.Resolve(result); + } else if(m_medium != null) + m_medium.Resolve(default(T2)); } - public void Reject(Exception err) { - if (errorHandler != null) { + public override void Reject(Exception error) { + if (m_errorHandler != null) { try { - var res = errorHandler(err); - if (medium != null) - medium.Resolve(res); + var res = m_errorHandler(error); + if (m_medium != null) + m_medium.Resolve(res); /*} catch (TransientPromiseException err2) { if (medium != null) medium.Reject(err2.InnerException);*/ } catch (Exception err2) { - if (medium != null) - medium.Reject(err2); + if (m_medium != null) + m_medium.Reject(err2); } - } else if (medium != null) - medium.Reject(err); + } else if (m_medium != null) + m_medium.Reject(error); } - public void Cancel() { - if (cancellHandler != null) { + public override void Cancel() { + if (m_cancellHandler != null) { try { - cancellHandler(); + m_cancellHandler(); } catch (Exception err) { Reject(err); return; } } - if (medium != null) - medium.Cancel(); + if (m_medium != null) + m_medium.Cancel(); } } @@ -103,14 +118,15 @@ T m_result; Exception m_error; - readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>(); + readonly MTCustomQueue<AbstractHandler> m_handlers = new MTCustomQueue<AbstractHandler>(); + //readonly MTQueue<AbstractHandler> m_handlers = new MTQueue<AbstractHandler>(); public Promise() { } public Promise(IPromise parent) { if (parent != null) - AddHandler( + AddHandler<T>( null, null, () => { @@ -215,49 +231,6 @@ } } - public IPromise<T> Then(Action<T> success, Func<Exception,T> error, Action cancel) { - if (success == null && error == null && cancel == null) - return this; - - var medium = new Promise<T>(this); - - AddHandler(success, error, cancel, medium, true); - - return medium; - } - - /// <summary> - /// Adds new handlers to this promise. - /// </summary> - /// <param name="success">The handler of the successfully completed operation. - /// This handler will recieve an operation result as a parameter.</param> - /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param> - /// <returns>The new promise chained to this one.</returns> - public IPromise<T> Then(Action<T> success, Func<Exception,T> error) { - if (success == null && error == null) - return this; - - var medium = new Promise<T>(this); - - AddHandler(success, error, null, medium, true); - - return medium; - } - - - - - public IPromise<T> Then(Action<T> success) { - if (success == null) - return this; - - var medium = new Promise<T>(this); - - AddHandler(success, null, null, medium, true); - - return medium; - } - /// <summary> /// Последний обработчик в цепочки обещаний. /// </summary> @@ -279,13 +252,19 @@ if (success == null && error == null && cancel == null) return; - Func<Exception,T> errorHandler = null; - if (error != null) - errorHandler = err => { - error(err); + AddHandler( + success != null ? new Func<T,T>(x => { + success(x); + return x; + }) : null, + error != null ? new Func<Exception,T>(e => { + error(e); return default(T); - }; - AddHandler(success, errorHandler, cancel, null, false); + }) : null, + cancel, + null, + false + ); } public void On(Action<T> success, Action<Exception> error) { @@ -299,7 +278,10 @@ public void On(Action handler, PromiseEventType events) { Safe.ArgumentNotNull(handler, "handler"); - Action<T> success = events.HasFlag(PromiseEventType.Success) ? new Action<T>(x => handler()) : null; + Func<T,T> success = events.HasFlag(PromiseEventType.Success) ? new Func<T,T>(x => { + handler(); + return x; + }) : null; Func<Exception,T> error = events.HasFlag(PromiseEventType.Error) ? new Func<Exception,T>(e => { handler(); return default(T); @@ -363,39 +345,11 @@ // создаем прицепленное обещание var medium = new Promise<TNew>(this); - Action<T> resultHandler = result => medium.Resolve(mapper(result)); - Func<Exception,T> errorHandler; - if (error != null) - errorHandler = e => { - try { - medium.Resolve(error(e)); - } catch (Exception e2) { - // в случае ошибки нужно передать исключение дальше по цепочке - medium.Reject(e2); - } - return default(T); - }; - else - errorHandler = e => { - medium.Reject(e); - return default(T); - }; - - Action cancelHandler; - if (cancel != null) - cancelHandler = () => { - cancel(); - medium.Cancel(); - }; - else - cancelHandler = medium.Cancel; - - AddHandler( - resultHandler, - errorHandler, - cancelHandler, - null, + mapper, + error, + cancel, + medium, true ); @@ -431,9 +385,9 @@ // передать через него результаты работы. var medium = new Promise<TNew>(this); - Action<T> resultHandler = delegate(T result) { + Func<T,T> resultHandler = delegate(T result) { if (medium.IsCancelled) - return; + return default(T); var promise = chained(result); @@ -454,6 +408,8 @@ promise.Cancel(); } ); + + return default(T); }; Func<Exception,T> errorHandler; @@ -534,7 +490,10 @@ var medium = new Promise<T>(this); AddHandler( - x => handler(), + x => { + handler(); + return x; + }, e => { handler(); throw new TransientPromiseException(e); @@ -600,16 +559,11 @@ return Join(Timeout.Infinite); } - void AddHandler(Action<T> success, Func<Exception,T> error, Action cancel, Promise<T> medium, bool inc) { + void AddHandler<T2>(Func<T,T2> success, Func<Exception,T2> error, Action cancel, Promise<T2> medium, bool inc) { if (inc) Interlocked.Increment(ref m_childrenCount); - var handler = new HandlerDescriptor { - resultHandler = success, - errorHandler = error, - cancellHandler = cancel, - medium = medium - }; + AbstractHandler handler = new HandlerDescriptor<T2>(success, error, cancel, medium); bool queued; @@ -631,7 +585,7 @@ InvokeHandler(handler); } - protected virtual void InvokeHandler(HandlerDescriptor handler) { + protected virtual void InvokeHandler(AbstractHandler handler) { switch (m_state) { case SUCCEEDED_STATE: handler.Resolve(m_result); @@ -649,7 +603,7 @@ } void OnStateChanged() { - HandlerDescriptor handler; + AbstractHandler handler; while (m_handlers.TryDequeue(out handler)) InvokeHandler(handler); } @@ -688,16 +642,13 @@ var dest = i; if (promises[i] != null) { - promises[i].Then( + promises[i].On( x => { result[dest] = x; if (Interlocked.Decrement(ref pending) == 0) promise.Resolve(result); }, - e => { - promise.Reject(e); - return default(T); - } + promise.Reject ); } else { if (Interlocked.Decrement(ref pending) == 0) @@ -776,7 +727,10 @@ IPromise IPromise.Then(Action success, Action<Exception> error, Action cancel) { return Then( - success != null ? new Action<T>(x => success()) : null, + success != null ? new Func<T,T>(x => { + success(); + return x; + }) : null, error != null ? new Func<Exception,T>(e => { error(e); return default(T); @@ -787,7 +741,10 @@ IPromise IPromise.Then(Action success, Action<Exception> error) { return Then( - success != null ? new Action<T>(x => success()) : null, + success != null ? new Func<T,T>(x => { + success(); + return x; + }) : null, error != null ? new Func<Exception,T>(e => { error(e); return default(T); @@ -797,7 +754,10 @@ IPromise IPromise.Then(Action success) { Safe.ArgumentNotNull(success, "success"); - return Then(x => success()); + return Then(x => { + success(); + return x; + }); } IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) { @@ -809,9 +769,9 @@ var medium = new Promise<object>(this); - Action<T> resultHandler = delegate { + Func<T,T> resultHandler = delegate { if (medium.IsCancelled) - return; + return default(T); var promise = chained(); @@ -828,6 +788,8 @@ if (promise.IsExclusive) promise.Cancel(); }); + + return default(T); }; Func<Exception,T> errorHandler;
--- a/Implab/SyncContextPromise.cs Mon Nov 10 10:17:54 2014 +0300 +++ b/Implab/SyncContextPromise.cs Mon Nov 10 18:00:28 2014 +0300 @@ -14,7 +14,7 @@ Safe.ArgumentNotNull(context, "context"); m_context = context; } - protected override void InvokeHandler(HandlerDescriptor handler) { + protected override void InvokeHandler(AbstractHandler handler) { m_context.Post(x => base.InvokeHandler(handler),null); } }