Mercurial > pub > ImplabNet
view Implab/PromiseT.cs @ 120:f1b897999260 v2
improved asyncpool usability
working on batch operations on asyncqueue
author | cin |
---|---|
date | Mon, 12 Jan 2015 05:19:52 +0300 |
parents | 2573b562e328 |
children | 671f60cd0250 |
line wrap: on
line source
using System; using System.Diagnostics; namespace Implab { /// <summary> /// Класс для асинхронного получения результатов. Так называемое "обещание". /// </summary> /// <typeparam name="T">Тип получаемого результата</typeparam> /// <remarks> /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции, /// клиент получив такое обещание может установить ряд обратных вызово для получения /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para> /// <para> /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на /// данные события клиент должен использовать методы <c>Then</c>. /// </para> /// <para> /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой), /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о /// выполнении обещания. /// </para> /// <para> /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался, /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении /// обещания. /// </para> /// <para> /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует /// использовать соответствующую форму методе <c>Then</c>. /// </para> /// <para> /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать /// только инициатор обещания иначе могут возникнуть противоречия. /// </para> /// </remarks> public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> { class StubDeferred : IDeferred<T> { public static readonly StubDeferred instance = new StubDeferred(); StubDeferred() { } #region IDeferred implementation public void Resolve(T value) { } public void Reject(Exception error) { } #endregion #region ICancellable implementation public void Cancel() { } #endregion } class RemapDescriptor<T2> : IDeferred<T> { readonly Func<T,T2> m_remap; readonly Func<Exception,T2> m_failed; readonly Func<T2> m_cancel; readonly IDeferred<T2> m_deferred; public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) { Debug.Assert(deferred != null); m_remap = remap; m_failed = failed; m_cancel = cancel; m_deferred = deferred; } #region IDeferred implementation public void Resolve(T value) { if (m_remap != null) { try { m_deferred.Resolve(m_remap(value)); } catch (Exception ex) { Reject(ex); } } } public void Reject(Exception error) { if (m_failed != null) { try { m_deferred.Resolve(m_failed(error)); } catch (Exception ex) { m_deferred.Reject(ex); } } else { m_deferred.Reject(error); } } #endregion #region ICancellable implementation public void Cancel() { if (m_cancel != null) { try { m_deferred.Resolve(m_cancel()); } catch (Exception ex) { Reject(ex); } } else { m_deferred.Cancel(); } } #endregion } class ListenerDescriptor : IDeferred<T> { readonly Action m_handler; readonly PromiseEventType m_events; public ListenerDescriptor(Action handler, PromiseEventType events) { Debug.Assert(handler != null); m_handler = handler; m_events = events; } #region IDeferred implementation public void Resolve(T value) { if (m_events.HasFlag(PromiseEventType.Success)) { try { m_handler(); // Analysis disable once EmptyGeneralCatchClause } catch { } } } public void Reject(Exception error) { if (m_events.HasFlag(PromiseEventType.Error)){ try { m_handler(); // Analysis disable once EmptyGeneralCatchClause } catch { } } } #endregion #region ICancellable implementation public void Cancel() { if (m_events.HasFlag(PromiseEventType.Cancelled)){ try { m_handler(); // Analysis disable once EmptyGeneralCatchClause } catch { } } } #endregion } class ValueEventDescriptor : IDeferred<T> { readonly Action<T> m_success; readonly Action<Exception> m_failed; readonly Action m_cancelled; readonly IDeferred<T> m_deferred; public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) { Debug.Assert(deferred != null); m_success = success; m_failed = failed; m_cancelled = cancelled; m_deferred = deferred; } #region IDeferred implementation public void Resolve(T value) { if (m_success != null) { try { m_success(value); m_deferred.Resolve(value); } catch (Exception ex) { Reject(ex); } } } public void Reject(Exception error) { if (m_failed != null) { try { m_failed(error); m_deferred.Resolve(default(T)); } catch(Exception ex) { m_deferred.Reject(ex); } } else { m_deferred.Reject(error); } } #endregion #region ICancellable implementation public void Cancel() { if (m_cancelled != null) { try { m_cancelled(); m_deferred.Resolve(default(T)); } catch(Exception ex) { Reject(ex); } } else { m_deferred.Cancel(); } } #endregion } public class EventDescriptor : IDeferred<T> { readonly Action m_success; readonly Action<Exception> m_failed; readonly Action m_cancelled; readonly IDeferred<T> m_deferred; public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) { Debug.Assert(deferred != null); m_success = success; m_failed = failed; m_cancelled = cancelled; m_deferred = deferred; } #region IDeferred implementation public void Resolve(T value) { if (m_success != null) { try { m_success(); m_deferred.Resolve(value); } catch (Exception ex) { Reject(ex); } } } public void Reject(Exception error) { if (m_failed != null) { try { m_failed(error); m_deferred.Resolve(default(T)); }catch (Exception ex) { m_deferred.Reject(ex); } } else { m_deferred.Reject(error); } } #endregion #region ICancellable implementation public void Cancel() { if (m_cancelled != null) { try { m_cancelled(); m_deferred.Resolve(default(T)); } catch (Exception ex) { Reject(ex); } } else { m_deferred.Cancel(); } } #endregion } T m_result; public virtual void Resolve(T value) { BeginSetResult(); m_result = value; EndSetResult(); } public void Reject(Exception error) { SetError(error); } public Type PromiseType { get { return typeof(T); } } public new T Join() { WaitResult(-1); return m_result; } public new T Join(int timeout) { WaitResult(timeout); return m_result; } public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) { AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance)); return this; } public IPromise<T> On(Action<T> success, Action<Exception> error) { AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance)); return this; } public IPromise<T> On(Action<T> success) { AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance)); return this; } public IPromise<T> On(Action handler, PromiseEventType events) { Listen(events, handler); return this; } public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) { var promise = new Promise<T2>(); AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise)); return promise; } public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) { var promise = new Promise<T2>(); AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise)); return promise; } public IPromise<T2> Then<T2>(Func<T, T2> mapper) { var promise = new Promise<T2>(); AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise)); return promise; } public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) { // this promise will be resolved when an asyc operation is started var promise = new Promise<IPromise<T2>>(); AddHandler(new RemapDescriptor<IPromise<T2>>( chained, error, cancel, promise )); var medium = new Promise<T2>(); if (chained != null) medium.On(Cancel, PromiseEventType.Cancelled); // we need to connect started async operation with the medium // if the async operation hasn't been started by the some reason // report is to the medium promise.On( result => ConnectPromise<T2>(result, medium), medium.Reject, medium.Cancel ); return medium; } static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) { if (result != null) { result.On( medium.Resolve, medium.Reject, () => medium.Reject(new OperationCanceledException()) ); medium.On(result.Cancel, PromiseEventType.Cancelled); } else { medium.Reject( new NullReferenceException( "The chained asynchronous operation returned" + " 'null' where the promise instance is expected" ) ); } } public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) { return Chain(chained, error, null); } public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) { return Chain(chained, null, null); } public IPromise<T2> Error<T2>(Func<Exception, T2> error) { var promise = new Promise<T2>(); if (error != null) On( (Action<T>)null, ex => { try { promise.Resolve(error(ex)); } catch (Exception ex2) { promise.Reject(ex2); } } ); else Listen(PromiseEventType.Error, () => promise.Resolve(default(T2))); return promise; } public IPromise<T2> Cancelled<T2>(Func<T2> handler) { var promise = new Promise<T2>(); if (handler != null) On( (Action<T>)null, null, () => { try { promise.Resolve(handler()); } catch (Exception ex) { promise.Reject(ex); } }); else Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2))); return promise; } public IPromise Then(Action success, Action<Exception> error, Action cancel) { var promise = new Promise<T>(); if (success != null) promise.On(Cancel, PromiseEventType.Cancelled); AddHandler(new EventDescriptor(success, error, cancel, promise)); return promise; } public IPromise Then(Action success, Action<Exception> error) { return Then(success, error, null); } public IPromise Then(Action success) { return Then(success, null, null); } public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) { var promise = new Promise<IPromise>(); AddHandler( new RemapDescriptor<IPromise>( x => chained(), error, cancel, promise ) ); var medium = new Promise(); if (chained != null) medium.On(Cancel, PromiseEventType.Cancelled); promise.On( result => ConnectPromise(result, medium), medium.Reject, medium.Cancel ); return medium; } static void ConnectPromise(IPromise result, Promise medium) { if (result != null) { result.On( medium.Resolve, medium.Reject, () => medium.Reject(new OperationCanceledException()) ); medium.On(result.Cancel, PromiseEventType.Cancelled); } else { medium.Reject( new NullReferenceException( "The chained asynchronous operation returned" + " 'null' where the promise instance is expected" ) ); } } public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) { return Chain(chained, error, null); } public IPromise Chain(Func<IPromise> chained) { return Chain(chained, null, null); } public IPromise On(Action success, Action<Exception> error, Action cancel) { AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance)); return this; } public IPromise On(Action success, Action<Exception> error) { AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance)); return this; } public IPromise On(Action success) { Listen(PromiseEventType.Success, success); return this; } IPromise IPromise.On(Action handler, PromiseEventType events) { Listen(events,handler); return this; } public IPromise Error(Action<Exception> error) { var promise = new Promise(); if (error != null) On( (Action<T>)null, ex => { try { error(ex); promise.Resolve(); } catch (Exception ex2) { promise.Reject(ex2); } }); else Listen(PromiseEventType.Error, promise.Resolve); return promise; } public IPromise Cancelled(Action handler) { var promise = new Promise(); if (handler != null) On( (Action<T>)null, null, () => { try { handler(); promise.Resolve(); } catch (Exception ex) { promise.Reject(ex); } }); else Listen(PromiseEventType.Cancelled, promise.Resolve); return promise; } public IPromise<T2> Cast<T2>() { return (IPromise<T2>)this; } #region implemented abstract members of AbstractPromise protected override void SignalSuccess(IDeferred<T> handler) { handler.Resolve(m_result); } protected override void SignalError(IDeferred<T> handler, Exception error) { handler.Reject(error); } protected override void SignalCancelled(IDeferred<T> handler) { handler.Cancel(); } protected override void Listen(PromiseEventType events, Action handler) { if (handler != null) AddHandler(new ListenerDescriptor(handler, events)); } #endregion public static IPromise<T> ResultToPromise(T value) { var p = new Promise<T>(); p.Resolve(value); return p; } public static IPromise<T> ExceptionToPromise(Exception error) { var p = new Promise<T>(); p.Reject(error); return p; } } }