Mercurial > pub > ImplabNet
changeset 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | e046a94eecb1 |
children | f1b897999260 |
files | Implab.Fx/ControlBoundPromise.cs Implab.Fx/PromiseHelpers.cs Implab.Test/AsyncTests.cs Implab/AbstractPromise.cs Implab/ComponentContainer.cs Implab/Diagnostics/Extensions.cs Implab/Diagnostics/TraceContext.cs Implab/DisposablePool.cs Implab/IDeferred.cs Implab/IDeferredT.cs Implab/IPromise.cs Implab/IPromiseT.cs Implab/Implab.csproj Implab/JSON/JSONWriter.cs Implab/ObjectPool.cs Implab/Parallels/ArrayTraits.cs Implab/Parallels/AsyncPool.cs Implab/Parallels/AsyncQueue.cs Implab/Parallels/WorkerPool.cs Implab/Promise.cs Implab/PromiseExtensions.cs Implab/PromiseT.cs Implab/PromiseTransientException.cs Implab/Safe.cs Implab/SyncContextPromise.cs Implab/TransientPromiseException.cs MonoPlay/Program.cs |
diffstat | 27 files changed, 1710 insertions(+), 1115 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Fx/ControlBoundPromise.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab.Fx/ControlBoundPromise.cs Sun Jan 11 19:13:02 2015 +0300 @@ -12,19 +12,27 @@ m_target = target; } - public ControlBoundPromise(Control target, IPromise parent) - : base(parent) { - Safe.ArgumentNotNull(target, "target"); - - m_target = target; + protected override void SignalSuccess(IDeferred<T> handler) { + if (m_target.InvokeRequired) + m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalSuccess), handler); + else + base.SignalSuccess(handler); } - protected override void InvokeHandler(AbstractHandler handler) { + protected override void SignalCancelled(IDeferred<T> handler) { if (m_target.InvokeRequired) - m_target.BeginInvoke(new Action<AbstractHandler>(base.InvokeHandler), handler); + m_target.BeginInvoke(new Action<IDeferred<T>>(base.SignalCancelled), handler); else - base.InvokeHandler(handler); + base.SignalCancelled(handler); } + + protected override void SignalError(IDeferred<T> handler, Exception error) { + if (m_target.InvokeRequired) + m_target.BeginInvoke(new Action<IDeferred<T>,Exception>(base.SignalError), handler, error); + else + base.SignalError(handler, error); + } + } }
--- a/Implab.Fx/PromiseHelpers.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab.Fx/PromiseHelpers.cs Sun Jan 11 19:13:02 2015 +0300 @@ -27,7 +27,9 @@ Safe.ArgumentNotNull(that, "that"); Safe.ArgumentNotNull(ctl, "ctl"); - var directed = new ControlBoundPromise<T>(ctl,that); + var directed = new ControlBoundPromise<T>(ctl); + + directed.On(that.Cancel, PromiseEventType.Cancelled); that.On( directed.Resolve,
--- a/Implab.Test/AsyncTests.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab.Test/AsyncTests.cs Sun Jan 11 19:13:02 2015 +0300 @@ -72,7 +72,7 @@ p.Cancel(); var p2 = p - .Cancelled(() => { + .Cancelled<bool>(() => { throw new ApplicationException("CANCELLED"); }) .Error(e => true); @@ -195,11 +195,11 @@ .Invoke(() => 1) .Then(x => Interlocked.Add(ref count, x)) .Then(x => Math.Log10(x)) - .Anyway(() => { + .On(() => { Interlocked.Decrement(ref pending); if (pending == 0) stop.Set(); - }); + }, PromiseEventType.All); } stop.WaitOne(); @@ -255,7 +255,7 @@ } return 1; }) - .Anyway(() => Interlocked.Decrement(ref writers)); + .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); } for (int i = 0; i < 10; i++) { @@ -269,11 +269,72 @@ } while (writers > 0); return 1; }) - .Anyway(() => { + .On(() => { Interlocked.Decrement(ref readers); if (readers == 0) stop.Set(); - }); + }, PromiseEventType.All); + } + + stop.WaitOne(); + + Assert.AreEqual(100000, total); + } + + [TestMethod] + public void AsyncQueueTest() { + var queue = new AsyncQueue<int>(); + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + const int itemsPerWriter = 10000; + const int writersCount = 10; + + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + AsyncPool + .InvokeNewThread(() => { + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + } + return 1; + }) + .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + AsyncPool + .InvokeNewThread(() => { + int t; + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + } while (writers > 0); + return 1; + }) + .On(() => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }, PromiseEventType.All); } stop.WaitOne(); @@ -371,15 +432,15 @@ var step1 = PromiseHelper .Sleep(200, "Alan") - .Cancelled(() => flags[0] = true); + .On(() => flags[0] = true, PromiseEventType.Cancelled); var p = step1 .Chain(x => PromiseHelper .Sleep(200, "Hi, " + x) .Then(y => y) - .Cancelled(() => flags[1] = true) + .On(() => flags[1] = true, PromiseEventType.Cancelled) ) - .Cancelled(() => flags[2] = true); + .On(() => flags[2] = true, PromiseEventType.Cancelled); step1.Join(); p.Cancel(); try {
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/AbstractPromise.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,219 @@ +using System; +using Implab.Parallels; +using System.Threading; +using System.Reflection; + +namespace Implab { + public abstract class AbstractPromise<THandler> { + + const int UNRESOLVED_SATE = 0; + const int TRANSITIONAL_STATE = 1; + const int SUCCEEDED_STATE = 2; + const int REJECTED_STATE = 3; + const int CANCELLED_STATE = 4; + + int m_state; + Exception m_error; + + readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>(); + + #region state managment + bool BeginTransit() { + return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); + } + + void CompleteTransit(int state) { + if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) + throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); + } + + void WaitTransition() { + while (m_state == TRANSITIONAL_STATE) { + Thread.MemoryBarrier(); + } + } + + protected void BeginSetResult() { + if (!BeginTransit()) { + WaitTransition(); + if (m_state != CANCELLED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + protected void EndSetResult() { + CompleteTransit(SUCCEEDED_STATE); + OnSuccess(); + } + + + + /// <summary> + /// Выполняет обещание, сообщая об ошибке + /// </summary> + /// <remarks> + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// </remarks> + /// <param name="error">Исключение возникшее при выполнении операции</param> + /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> + protected void SetError(Exception error) { + if (BeginTransit()) { + m_error = error is PromiseTransientException ? error.InnerException : error; + CompleteTransit(REJECTED_STATE); + OnError(); + } else { + WaitTransition(); + if (m_state == SUCCEEDED_STATE) + throw new InvalidOperationException("The promise is already resolved"); + } + } + + /// <summary> + /// Отменяет операцию, если это возможно. + /// </summary> + /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> + protected void SetCancelled() { + if (BeginTransit()) { + CompleteTransit(CANCELLED_STATE); + OnCancelled(); + } + } + + protected abstract void SignalSuccess(THandler handler); + + protected abstract void SignalError(THandler handler, Exception error); + + protected abstract void SignalCancelled(THandler handler); + + void OnSuccess() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalSuccess(handler); + } + + void OnError() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalError(handler,m_error); + } + + void OnCancelled() { + THandler handler; + while (m_handlers.TryDequeue(out handler)) + SignalCancelled(handler); + } + + #endregion + + protected abstract void Listen(PromiseEventType events, Action handler); + + #region synchronization traits + protected void WaitResult(int timeout) { + if (!IsResolved) { + var lk = new object(); + + Listen(PromiseEventType.All, () => { + lock(lk) { + Monitor.Pulse(lk); + } + }); + + lock (lk) { + while(!IsResolved) { + if(!Monitor.Wait(lk,timeout)) + throw new TimeoutException(); + } + } + + } + switch (m_state) { + case SUCCEEDED_STATE: + return; + case CANCELLED_STATE: + throw new OperationCanceledException(); + case REJECTED_STATE: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); + } + } + #endregion + + #region handlers managment + + protected void AddHandler(THandler handler) { + + if (IsResolved) { + InvokeHandler(handler); + + } else { + // the promise is in the resolved state, just invoke the handler + m_handlers.Enqueue(handler); + + + if (IsResolved && m_handlers.TryDequeue(out handler)) + // if the promise have been resolved while we was adding the handler to the queue + // we can't guarantee that someone is still processing it + // therefore we need to fetch a handler from the queue and execute it + // note that fetched handler may be not the one that we have added + // even we can fetch no handlers at all :) + InvokeHandler(handler); + } + } + + protected void InvokeHandler(THandler handler) { + switch (m_state) { + case SUCCEEDED_STATE: + SignalSuccess(handler); + break; + case CANCELLED_STATE: + SignalCancelled(handler); + break; + case REJECTED_STATE: + SignalError(handler, m_error); + break; + default: + throw new Exception(String.Format("Invalid promise state {0}", m_state)); + } + } + + #endregion + + #region IPromise implementation + + public void Join(int timeout) { + WaitResult(timeout); + } + + public void Join() { + WaitResult(-1); + } + + public bool IsResolved { + get { + Thread.MemoryBarrier(); + return m_state > 1; + } + } + + public bool IsCancelled { + get { + Thread.MemoryBarrier(); + return m_state == CANCELLED_STATE; + } + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + SetCancelled(); + } + + #endregion + } +} +
--- a/Implab/ComponentContainer.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/ComponentContainer.cs Sun Jan 11 19:13:02 2015 +0300 @@ -18,7 +18,7 @@ } bool m_disposed; - readonly MTQueue<IDisposable> m_components = new MTQueue<IDisposable>(); + readonly AsyncQueue<IDisposable> m_components = new AsyncQueue<IDisposable>(); public void Add(IDisposable item) { Safe.ArgumentNotNull(item, "item");
--- a/Implab/Diagnostics/Extensions.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Diagnostics/Extensions.cs Sun Jan 11 19:13:02 2015 +0300 @@ -1,23 +1,23 @@ -namespace Implab.Diagnostics { +using System; + +namespace Implab.Diagnostics { public static class Extensions { public static IPromise<T> EndLogicalOperation<T>(this IPromise<T> promise) { Safe.ArgumentNotNull(promise, "promise"); var op = TraceContext.Instance.DetachLogicalOperation(); - return promise.Then<T>( - x => { + return promise.On( + x => { TraceContext.Instance.EnterLogicalOperation(op,true); TraceLog.TraceInformation("promise = {0}", x); TraceLog.EndLogicalOperation(); TraceContext.Instance.Leave(); - return x; }, err =>{ TraceContext.Instance.EnterLogicalOperation(op,true); TraceLog.TraceError("promise died {0}", err); TraceLog.EndLogicalOperation(); TraceContext.Instance.Leave(); - throw new TransientPromiseException(err); }, () => { TraceContext.Instance.EnterLogicalOperation(op,true); @@ -32,11 +32,11 @@ Safe.ArgumentNotNull(promise, "promise"); var op = TraceContext.Instance.DetachLogicalOperation(); - return promise.Anyway(() => { + return promise.On(() => { TraceContext.Instance.EnterLogicalOperation(op,true); TraceLog.EndLogicalOperation(); TraceContext.Instance.Leave(); - }); + }, PromiseEventType.All); } } }
--- a/Implab/Diagnostics/TraceContext.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Diagnostics/TraceContext.cs Sun Jan 11 19:13:02 2015 +0300 @@ -74,7 +74,7 @@ m_current = m_stack.Pop(); LogChannel<TraceEvent>.Default.LogEvent(new TraceEvent(TraceEventType.Leave, String.Format("{0} -> {1}", prev.Name, CurrentOperation.Name))); } else { - TraceLog.TraceWarning("Attemtp to leave the last operation context"); + TraceLog.TraceWarning("Attempt to leave the last operation context"); m_current = OperationContext.EMPTY; } }
--- a/Implab/DisposablePool.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/DisposablePool.cs Sun Jan 11 19:13:02 2015 +0300 @@ -7,7 +7,7 @@ namespace Implab { public abstract class DisposablePool<T> : IDisposable { readonly int m_size; - readonly MTQueue<T> m_queue = new MTQueue<T>(); + readonly AsyncQueue<T> m_queue = new AsyncQueue<T>(); [SuppressMessage("Microsoft.Design", "CA1000:DoNotDeclareStaticMembersOnGenericTypes")] static readonly bool _isValueType = typeof(T).IsValueType;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/IDeferred.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,14 @@ +using System; + +namespace Implab { + /// <summary> + /// Deferred result, usually used by asynchronous services as the service part of the promise. + /// </summary> + public interface IDeferred : ICancellable { + + void Resolve(); + + void Reject(Exception error); + } +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/IDeferredT.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,10 @@ +using System; + +namespace Implab { + public interface IDeferred<T> : ICancellable { + void Resolve(T value); + + void Reject(Exception error); + } +} +
--- a/Implab/IPromise.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/IPromise.cs Sun Jan 11 19:13:02 2015 +0300 @@ -5,12 +5,6 @@ namespace Implab { public interface IPromise: ICancellable { - /// <summary> - /// Check whereather the promise has no more than one dependent promise. - /// </summary> - bool IsExclusive { - get; - } /// <summary> /// Тип результата, получаемого через данное обещание. @@ -27,44 +21,98 @@ /// </summary> bool IsCancelled { get; } + /// <summary> + /// Creates a new promise dependend on the current one and resolved on + /// executing the specified handlers. + /// </summary> + /// <param name="success">The handler called on the successful promise completion.</param> + /// <param name="error">The handler is called if an error while completing the promise occurred.</param> + /// <param name="cancel">The handler is called in case of promise cancellation.</param> + /// <returns>The newly created dependant promise.</returns> + /// <remarks> + /// <para> + /// If the success handler is specified the dependend promise will be resolved after the handler is + /// executed and the dependent promise will be linked to the current one, i.e. the cancellation + /// of the dependent property will lead to the cancellation of the current promise. If the + /// success handler isn't specified the dependent promise will not be linked to and + /// will not be resolved after the successfull resolution of the current one. + /// </para> + /// <para> + /// When the error handler is specified, the exception raised during the current promise completion + /// will be passed to it as the parameter. If the error handler returns without raising an + /// exception then the dependant promise will be resolved successfully, otherwise the exception + /// raised by the handler will be transmitted to the dependent promise. If the handler wants + /// to passthrough the original exception it needs to wrap the exception with + /// the <see cref="PromiseTransientException"/>. + /// </para> + /// <para> + /// If the cancelation handler is specified and the current promise is cancelled then the dependent + /// promise will be resolved after the handler is executed. If the cancelation hendler raises the + /// exception it will be passed to the dependent promise. + /// </para> + /// </remarks> IPromise Then(Action success, Action<Exception> error, Action cancel); IPromise Then(Action success, Action<Exception> error); IPromise Then(Action success); - IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Action cancel); + IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel); IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error); IPromise Chain(Func<IPromise> chained); /// <summary> - /// Добавляет последнй обработчик в цепочку обещаний, не создает промежуточных обещаний. + /// Adds specified listeners to the current promise. /// </summary> - /// <param name="success">Success.</param> - /// <param name="error">Error.</param> - /// <param name="cancel">Cancel.</param> - void On(Action success, Action<Exception> error, Action cancel); - void On(Action success, Action<Exception> error); - void On(Action success); - void On(Action success, PromiseEventType events); + /// <param name="success">The handler called on the successful promise completion.</param> + /// <param name="error">The handler is called if an error while completing the promise occurred.</param> + /// <param name="cancel">The handler is called in case of promise cancellation.</param> + /// <returns>The current promise.</returns> + IPromise On(Action success, Action<Exception> error, Action cancel); + IPromise On(Action success, Action<Exception> error); + IPromise On(Action success); - IPromise Error(Action<Exception> error); /// <summary> - /// Обрабатывает либо ошибку, либо результат, либо отмену. + /// Adds specified listeners to the current promise. + /// </summary> + /// <param name="handler">The handler called on the specified events.</param> + /// <param name = "events">The combination of flags denoting the events for which the + /// handler shoud be called.</param> + /// <returns>The current promise.</returns> + IPromise On(Action handler, PromiseEventType events); + + /// <summary> + /// Adds the specified error handler to the current promise + /// and creates the new dependant promise. /// </summary> - /// <param name="handler">Обработчик.</param> - /// <remarks>После обработке ошибки, она передается дальше.</remarks> + /// <param name="error"> + /// The error handler. If the error handler returns without + /// an error the dependant promise will be successfully resolved. + /// </param> + /// <returns> + /// The new dependant promise which will be resolved after the error + /// handler is executed. + /// </returns> + /// <remarks> + /// The successfull result of the current promise will be ignored. + /// </remarks> + IPromise Error(Action<Exception> error); + /// <summary> - /// Обрабатывает либо ошибку, либо результат, либо отмену обещания. + /// Adds the specified cncellation handler to the current promise + /// and creates the new dependant promise. /// </summary> - /// <param name="handler">Обработчик.</param> - /// <remarks>После обработке ошибки, она передается дальше.</remarks> - IPromise Anyway(Action handler); - /// <summary> - /// Обработчик для регистрации отмены обещания. - /// </summary> - /// <returns>Новое обещание, связанное с текущим, выполнится после указанного обработчика.</returns> - /// <param name="handler">Обработчик события.</param> - /// <remarks>Если обработчик вызывает исключение, то оно передается обработчику ошибки, результат работы - /// которого будет передан связанному обещанию</remarks> + /// <returns> + /// The new dependant promise which will be resolved after the cancellation + /// handler is executed. + /// </returns> + /// <param name="handler"> + /// The cancellation handler. + /// </param> + /// <remarks> + /// If the cancellation handler is executed without an error the dependent + /// promise will be successfully resolved, otherwise the raised exception + /// will be passed to the dependant promise. The successful result of the + /// current promise will be ignored. + /// </remarks> IPromise Cancelled(Action handler); /// <summary>
--- a/Implab/IPromiseT.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/IPromiseT.cs Sun Jan 11 19:13:02 2015 +0300 @@ -1,34 +1,34 @@ using System; namespace Implab { - public interface IPromise<T> : IPromise { + public interface IPromise<out T> : IPromise { new T Join(); new T Join(int timeout); - void On(Action<T> success, Action<Exception> error, Action cancel); + IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel); - void On(Action<T> success, Action<Exception> error); + IPromise<T> On(Action<T> success, Action<Exception> error); - void On(Action<T> success); + IPromise<T> On(Action<T> success); - IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Action cancel); + new IPromise<T> On(Action handler, PromiseEventType events); + + IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error, Func<T2> cancel); IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception,T2> error); IPromise<T2> Then<T2>(Func<T, T2> mapper); - IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Action cancel); + IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error, Func<IPromise<T2>> cancel); IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception,IPromise<T2>> error); IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained); - IPromise<T> Error(Func<Exception,T> error); + IPromise<T2> Error<T2>(Func<Exception,T2> error); - new IPromise<T> Cancelled(Action handler); - - new IPromise<T> Anyway(Action handler); + IPromise<T2> Cancelled<T2>(Func<T2> handler); } }
--- a/Implab/Implab.csproj Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Implab.csproj Sun Jan 11 19:13:02 2015 +0300 @@ -7,6 +7,8 @@ <OutputType>Library</OutputType> <RootNamespace>Implab</RootNamespace> <AssemblyName>Implab</AssemblyName> + <ProductVersion>8.0.30703</ProductVersion> + <SchemaVersion>2.0</SchemaVersion> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <DebugSymbols>true</DebugSymbols> @@ -131,12 +133,10 @@ <Compile Include="TaskController.cs" /> <Compile Include="ProgressInitEventArgs.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> - <Compile Include="Promise.cs" /> <Compile Include="Parallels\AsyncPool.cs" /> <Compile Include="Safe.cs" /> <Compile Include="ValueEventArgs.cs" /> <Compile Include="PromiseExtensions.cs" /> - <Compile Include="TransientPromiseException.cs" /> <Compile Include="SyncContextPromise.cs" /> <Compile Include="Diagnostics\OperationContext.cs" /> <Compile Include="Diagnostics\TraceContext.cs" /> @@ -150,6 +150,13 @@ <Compile Include="ComponentContainer.cs" /> <Compile Include="DisposablePool.cs" /> <Compile Include="ObjectPool.cs" /> + <Compile Include="Parallels\AsyncQueue.cs" /> + <Compile Include="PromiseT.cs" /> + <Compile Include="IDeferred.cs" /> + <Compile Include="IDeferredT.cs" /> + <Compile Include="AbstractPromise.cs" /> + <Compile Include="Promise.cs" /> + <Compile Include="PromiseTransientException.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup />
--- a/Implab/JSON/JSONWriter.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/JSON/JSONWriter.cs Sun Jan 11 19:13:02 2015 +0300 @@ -1,9 +1,6 @@ using System; using System.Collections.Generic; using System.IO; -using System.Linq; -using System.Text; -using System.Threading.Tasks; namespace Implab.JSON { public class JSONWriter { @@ -23,7 +20,6 @@ _escapeCR, _escapeNL, _escapeTAB, - _escapeSLASH, _escapeBSLASH, _escapeQ; @@ -34,7 +30,6 @@ _escapeNL = "\\n".ToCharArray(); _escapeTAB = "\\t".ToCharArray(); _escapeBSLASH = "\\\\".ToCharArray(); - _escapeSLASH = "\\/".ToCharArray(); _escapeQ = "\\\"".ToCharArray(); } @@ -205,6 +200,7 @@ var chars = value.ToCharArray(); m_writer.Write('"'); + // Analysis disable once ForCanBeConvertedToForeach for (int i = 0; i < chars.Length; i++) { var ch = chars[i];
--- a/Implab/ObjectPool.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/ObjectPool.cs Sun Jan 11 19:13:02 2015 +0300 @@ -18,7 +18,7 @@ /// <para>Пул поддерживает обращения сразу из нескольких потоков.</para> /// </remarks> public abstract class ObjectPool<T> where T : class { - readonly MTQueue<WeakReference> m_queue = new MTQueue<WeakReference>(); + readonly AsyncQueue<WeakReference> m_queue = new AsyncQueue<WeakReference>(); readonly int m_size; int m_count = 0;
--- a/Implab/Parallels/ArrayTraits.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Parallels/ArrayTraits.cs Sun Jan 11 19:13:02 2015 +0300 @@ -26,7 +26,7 @@ m_pending = source.Length; m_action = action; - m_promise.Anyway(Dispose); + m_promise.On(Dispose, PromiseEventType.All); InitPool(); } @@ -86,7 +86,7 @@ m_transform = transform; m_logicalOperation = TraceContext.Instance.CurrentOperation; - m_promise.Anyway(Dispose); + m_promise.On(Dispose, PromiseEventType.All); InitPool(); } @@ -162,7 +162,7 @@ int slots = threads; // Analysis disable AccessToDisposedClosure - AsyncPool.InvokeNewThread(() => { + AsyncPool.InvokeNewThread<int>(() => { for (int i = 0; i < source.Length; i++) { if(promise.IsResolved) break; // stop processing in case of error or cancellation @@ -177,7 +177,7 @@ try { transform(source[i]) - .Anyway(() => { + .On( x => { Interlocked.Increment(ref slots); lock (locker) { Monitor.Pulse(locker);
--- a/Implab/Parallels/AsyncPool.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Parallels/AsyncPool.cs Sun Jan 11 19:13:02 2015 +0300 @@ -53,7 +53,7 @@ public static IPromise InvokeNewThread(Action func) { - var p = new Promise<object>(); + var p = new Promise(); var caller = TraceContext.Instance.CurrentOperation;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/AsyncQueue.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,244 @@ +using System.Threading; +using System.Collections.Generic; +using System; +using System.Collections; + +namespace Implab.Parallels { + public class AsyncQueue<T> : IEnumerable<T> { + class Chunk { + public Chunk next; + + int m_low; + int m_hi; + int m_alloc; + readonly int m_size; + readonly T[] m_data; + + public Chunk(int size) { + m_size = size; + m_data = new T[size]; + } + + public Chunk(int size, T value) { + m_size = size; + m_hi = 1; + m_alloc = 1; + m_data = new T[size]; + m_data[0] = value; + } + + public int Low { + get { return m_low; } + } + + public int Hi { + get { return m_hi; } + } + + public bool TryEnqueue(T value,out bool extend) { + extend = false; + int alloc; + do { + alloc = m_alloc; + if (alloc > m_size) + return false; + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); + + if (alloc == m_size) { + extend = true; + return false; + } + + m_data[alloc] = value; + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { + // spin wait for commit + } + return true; + } + + public bool TryDequeue(out T value,out bool recycle) { + int low; + do { + low = m_low; + if (low >= m_hi) { + value = default(T); + recycle = (low == m_size); + return false; + } + } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); + + recycle = (low == m_size - 1); + value = m_data[low]; + + return true; + } + + public T GetAt(int pos) { + return m_data[pos]; + } + } + + public const int DEFAULT_CHUNK_SIZE = 32; + + readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; + + Chunk m_first; + Chunk m_last; + + public AsyncQueue() { + m_last = m_first = new Chunk(m_chunkSize); + } + + public void Enqueue(T value) { + var last = m_last; + // spin wait to the new chunk + bool extend = true; + while(last == null || !last.TryEnqueue(value, out extend)) { + // try to extend queue + if (extend || last == null) { + var chunk = new Chunk(m_chunkSize, value); + if (EnqueueChunk(last, chunk)) + break; + last = m_last; + } else { + while (last != m_last) { + Thread.MemoryBarrier(); + last = m_last; + } + } + } + } + + public bool TryDequeue(out T value) { + var chunk = m_first; + bool recycle; + while (chunk != null) { + + var result = chunk.TryDequeue(out value, out recycle); + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + else + return result; // this chunk is usable and returned actual result + + if (result) // this chunk is waste but the true result is always actual + return true; + + // try again + chunk = m_first; + } + + // the queue is empty + value = default(T); + return false; + } + + bool EnqueueChunk(Chunk last, Chunk chunk) { + if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) + return false; + + if (last != null) + last.next = chunk; + else + m_first = chunk; + return true; + } + + void RecycleFirstChunk(Chunk first) { + var next = first.next; + + if (next == null) { + // looks like this is the last chunk + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // race + // maybe someone already recycled this chunk + // or a new chunk has been appedned to the queue + + return; // give up + } + // the tail is updated + } + + // we need to update the head + Interlocked.CompareExchange(ref m_first, next, first); + // if the head is already updated then give up + return; + + } + + #region IEnumerable implementation + + class Enumerator : IEnumerator<T> { + Chunk m_current; + int m_pos = -1; + + public Enumerator(Chunk fisrt) { + m_current = fisrt; + } + + #region IEnumerator implementation + + public bool MoveNext() { + if (m_current == null) + return false; + + if (m_pos == -1) + m_pos = m_current.Low; + else + m_pos++; + if (m_pos == m_current.Hi) { + m_pos = 0; + m_current = m_current.next; + } + + return true; + } + + public void Reset() { + throw new NotSupportedException(); + } + + object IEnumerator.Current { + get { + return Current; + } + } + + #endregion + + #region IDisposable implementation + + public void Dispose() { + } + + #endregion + + #region IEnumerator implementation + + public T Current { + get { + if (m_pos == -1 || m_current == null) + throw new InvalidOperationException(); + return m_current.GetAt(m_pos); + } + } + + #endregion + } + + public IEnumerator<T> GetEnumerator() { + return new Enumerator(m_first); + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } +}
--- a/Implab/Parallels/WorkerPool.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Parallels/WorkerPool.cs Sun Jan 11 19:13:02 2015 +0300 @@ -6,7 +6,7 @@ namespace Implab.Parallels { public class WorkerPool : DispatchPool<Action> { - MTQueue<Action> m_queue = new MTQueue<Action>(); + AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); int m_queueLength = 0; readonly int m_threshold = 1;
--- a/Implab/Promise.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Promise.cs Sun Jan 11 19:13:02 2015 +0300 @@ -1,954 +1,258 @@ -using System; -using System.Collections.Generic; -using System.Reflection; -using System.Threading; -using Implab.Parallels; - -namespace Implab { - - /// <summary> - /// Класс для асинхронного получения результатов. Так называемое "обещание". - /// </summary> - /// <typeparam name="T">Тип получаемого результата</typeparam> - /// <remarks> - /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции, - /// клиент получив такое обещание может установить ряд обратных вызово для получения - /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para> - /// <para> - /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на - /// данные события клиент должен использовать методы <c>Then</c>. - /// </para> - /// <para> - /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой), - /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о - /// выполнении обещания. - /// </para> - /// <para> - /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался, - /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном - /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в - /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении - /// обещания. - /// </para> - /// <para> - /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать - /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует - /// использовать соответствующую форму методе <c>Then</c>. - /// </para> - /// <para> - /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать - /// только инициатор обещания иначе могут возникнуть противоречия. - /// </para> - /// </remarks> - public class Promise<T> : IPromise<T> { - - protected abstract class AbstractHandler : MTCustomQueueNode<AbstractHandler> { - public abstract void Resolve(T result); - public abstract void Reject(Exception error); - public abstract void Cancel(); - } - - protected class RemapDescriptor<T2> : AbstractHandler { - - readonly Func<T,T2> m_resultHandler; - readonly Func<Exception,T2> m_errorHandler; - readonly Action m_cancellHandler; - readonly Promise<T2> m_medium; - - public RemapDescriptor(Func<T,T2> resultHandler, Func<Exception,T2> errorHandler, Action cancelHandler, Promise<T2> medium) { - m_resultHandler = resultHandler; - m_errorHandler = errorHandler; - m_cancellHandler = cancelHandler; - m_medium = medium; - } - - public override void Resolve(T result) { - if (m_resultHandler != null) { - try { - if (m_medium != null) - m_medium.Resolve(m_resultHandler(result)); - else - m_resultHandler(result); - } catch (Exception e) { - Reject(e); - } - } else if(m_medium != null) - m_medium.Resolve(default(T2)); - } - - public override void Reject(Exception error) { - if (m_errorHandler != null) { - try { - var res = m_errorHandler(error); - if (m_medium != null) - m_medium.Resolve(res); - } catch (Exception err2) { - if (m_medium != null) - m_medium.Reject(err2); - } - } else if (m_medium != null) - m_medium.Reject(error); - } - - public override void Cancel() { - if (m_cancellHandler != null) { - try { - m_cancellHandler(); - } catch (Exception err) { - Reject(err); - return; - } - } - if (m_medium != null) - m_medium.Cancel(); - } - } - - protected class HandlerDescriptor : AbstractHandler { - - readonly Action<T> m_resultHandler; - readonly Action<Exception> m_errorHandler; - readonly Action m_cancellHandler; - readonly Promise<T> m_medium; - - public HandlerDescriptor(Action<T> resultHandler, Action<Exception> errorHandler, Action cancelHandler, Promise<T> medium) { - m_resultHandler = resultHandler; - m_errorHandler = errorHandler; - m_cancellHandler = cancelHandler; - m_medium = medium; - } - - public override void Resolve(T result) { - if (m_resultHandler != null) { - try { - m_resultHandler(result); - } catch (Exception e) { - Reject(e); - return; - } - } - if(m_medium != null) - m_medium.Resolve(result); - } - - public override void Reject(Exception error) { - if (m_errorHandler != null) { - try { - m_errorHandler(error); - if (m_medium != null) - m_medium.Resolve(default(T)); - } catch (Exception err2) { - if (m_medium != null) - m_medium.Reject(err2); - } - } else if (m_medium != null) - m_medium.Reject(error); - } - - public override void Cancel() { - if (m_cancellHandler != null) { - try { - m_cancellHandler(); - } catch (Exception err) { - Reject(err); - return; - } - } - if (m_medium != null) - m_medium.Cancel(); - } - } - - const int UNRESOLVED_SATE = 0; - const int TRANSITIONAL_STATE = 1; - const int SUCCEEDED_STATE = 2; - const int REJECTED_STATE = 3; - const int CANCELLED_STATE = 4; - - int m_childrenCount; - int m_state; - T m_result; - Exception m_error; - - readonly MTCustomQueue<AbstractHandler> m_handlers = new MTCustomQueue<AbstractHandler>(); - //readonly MTQueue<AbstractHandler> m_handlers = new MTQueue<AbstractHandler>(); - - public Promise() { - } - - public Promise(IPromise parent) { - if (parent != null) - AddHandler( - null, - null, - () => { - if (parent.IsExclusive) - parent.Cancel(); - }, - null, - false - ); - } - - bool BeginTransit() { - return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); - } - - void CompleteTransit(int state) { - if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) - throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); - } - - void WaitTransition() { - while (m_state == TRANSITIONAL_STATE) { - Thread.MemoryBarrier(); - } - } - - public bool IsResolved { - get { - Thread.MemoryBarrier(); - return m_state > 1; - } - } - - public bool IsCancelled { - get { - Thread.MemoryBarrier(); - return m_state == CANCELLED_STATE; - } - } - - public Type PromiseType { - get { return typeof(T); } - } - - /// <summary> - /// Выполняет обещание, сообщая об успешном выполнении. - /// </summary> - /// <param name="result">Результат выполнения.</param> - /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> - public void Resolve(T result) { - if (BeginTransit()) { - m_result = result; - CompleteTransit(SUCCEEDED_STATE); - OnStateChanged(); - } else { - WaitTransition(); - if (m_state != CANCELLED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - } - } - - /// <summary> - /// Выполняет обещание, сообщая об успешном выполнении. Результатом выполнения будет пустое значения. - /// </summary> - /// <remarks> - /// Данный вариант удобен в случаях, когда интересен факт выполнения операции, нежели полученное значение. - /// </remarks> - public void Resolve() { - Resolve(default(T)); - } - - /// <summary> - /// Выполняет обещание, сообщая об ошибке - /// </summary> - /// <remarks> - /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков - /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные - /// будут проигнорированы. - /// </remarks> - /// <param name="error">Исключение возникшее при выполнении операции</param> - /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> - public void Reject(Exception error) { - if (BeginTransit()) { - m_error = error is TransientPromiseException ? error.InnerException : error; - CompleteTransit(REJECTED_STATE); - OnStateChanged(); - } else { - WaitTransition(); - if (m_state == SUCCEEDED_STATE) - throw new InvalidOperationException("The promise is already resolved"); - } - } - - /// <summary> - /// Отменяет операцию, если это возможно. - /// </summary> - /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> - public void Cancel() { - if (BeginTransit()) { - CompleteTransit(CANCELLED_STATE); - OnStateChanged(); - } - } - - /// <summary> - /// Последний обработчик в цепочки обещаний. - /// </summary> - /// <param name="success"></param> - /// <param name="error"></param> - /// <param name="cancel"></param> - /// <remarks> - /// <para> - /// Данный метод не создает связанного с текущим обещания и предназначен для окончания - /// фсинхронной цепочки. - /// </para> - /// <para> - /// Если данный метод вызвать несколько раз, либо добавить другие обработчики, то цепочка - /// не будет одиночной <see cref="IsExclusive"/> и, как следствие, будет невозможна отмена - /// всей цепи обещаний снизу (с самого последнего обещания). - /// </para> - /// </remarks> - public void On(Action<T> success, Action<Exception> error, Action cancel) { - if (success == null && error == null && cancel == null) - return; - - AddHandler(success, error, cancel, null, false); - } - - public void On(Action<T> success, Action<Exception> error) { - AddHandler(success, error, null, null, false); - } - - public void On(Action<T> success) { - AddHandler(success, null, null, null, false); - } - - public void On(Action handler, PromiseEventType events) { - Safe.ArgumentNotNull(handler, "handler"); - - - AddHandler( - events.HasFlag(PromiseEventType.Success) ? new Action<T>(x => handler()) : null, - events.HasFlag(PromiseEventType.Error) ? new Action<Exception>( x => handler()) : null, - events.HasFlag(PromiseEventType.Cancelled) ? handler : null, - null, - false - ); - } - - public IPromise Error(Action<Exception> error) { - if (error == null) - return this; - - var medium = new Promise<T>(this); - - AddMappers( - x => x, - e => { - error(e); - return default(T); - }, - null, - medium, - true - ); - - return medium; - } - - /// <summary> - /// Handles error and allows to keep the promise. - /// </summary> - /// <remarks> - /// If the specified handler throws an exception, this exception will be used to reject the promise. - /// </remarks> - /// <param name="handler">The error handler which returns the result of the promise.</param> - /// <returns>New promise.</returns> - public IPromise<T> Error(Func<Exception,T> handler) { - if (handler == null) - return this; - - var medium = new Promise<T>(this); - - AddMappers(x => x, handler, null, medium, true); - - return medium; - } - - /// <summary> - /// Позволяет преобразовать результат выполения операции к новому типу. - /// </summary> - /// <typeparam name="TNew">Новый тип результата.</typeparam> - /// <param name="mapper">Преобразование результата к новому типу.</param> - /// <param name="error">Обработчик ошибки. Данный обработчик получит - /// исключение возникшее при выполнении операции.</param> - /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns> - /// <param name = "cancel"></param> - public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error, Action cancel) { - Safe.ArgumentNotNull(mapper, "mapper"); - - // создаем прицепленное обещание - var medium = new Promise<TNew>(this); - - AddMappers( - mapper, - error, - cancel, - medium, - true - ); - - return medium; - } - - public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error) { - return Then(mapper, error, null); - } - - public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper) { - return Then(mapper, null, null); - } - - /// <summary> - /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после - /// выполнения текущей, а результат текущей операции может быть использован для инициализации - /// новой операции. - /// </summary> - /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam> - /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param> - /// <param name="error">Обработчик ошибки. Данный обработчик получит - /// исключение возникшее при выполнении текуещй операции.</param> - /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns> - /// <param name = "cancel"></param> - public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error, Action cancel) { - - Safe.ArgumentNotNull(chained, "chained"); - - // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно - // создать посредника, к которому будут подвызяваться следующие обработчики. - // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы - // передать через него результаты работы. - var medium = new Promise<TNew>(this); - - Func<T,T> resultHandler = delegate(T result) { - if (medium.IsCancelled) - return default(T); - - var promise = chained(result); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.On( - null, - null, - () => { - if (promise.IsExclusive) - promise.Cancel(); - } - ); - - return default(T); - }; - - Func<Exception,T> errorHandler; - - if (error != null) - errorHandler = delegate(Exception e) { - try { - var promise = error(e); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.Cancelled(() => { - if (promise.IsExclusive) - promise.Cancel(); - }); - } catch (Exception e2) { - medium.Reject(e2); - } - return default(T); - }; - else - errorHandler = err => { - medium.Reject(err); - return default(T); - }; - - - Action cancelHandler; - if (cancel != null) - cancelHandler = () => { - if (cancel != null) - cancel(); - medium.Cancel(); - }; - else - cancelHandler = medium.Cancel; - - AddMappers( - resultHandler, - errorHandler, - cancelHandler, - null, - true - ); - - return medium; - } - - public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error) { - return Chain(chained, error, null); - } - - public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained) { - return Chain(chained, null, null); - } - - public IPromise<T> Cancelled(Action handler) { - var medium = new Promise<T>(this); - AddHandler(null, null, handler, medium, false); - return medium; - } - - /// <summary> - /// Adds the specified handler for all cases (success, error, cancel) - /// </summary> - /// <param name="handler">The handler that will be called anyway</param> - /// <returns>self</returns> - public IPromise<T> Anyway(Action handler) { - Safe.ArgumentNotNull(handler, "handler"); - - var medium = new Promise<T>(this); - - AddHandler( - x => handler(), - e => { - handler(); - throw new TransientPromiseException(e); - }, - handler, - medium, - true - ); - - return medium; - } - - /// <summary> - /// Преобразует результат обещания к нужному типу - /// </summary> - /// <typeparam name="T2"></typeparam> - /// <returns></returns> - public IPromise<T2> Cast<T2>() { - return Then(x => (T2)(object)x, null); - } - - /// <summary> - /// Дожидается отложенного обещания и в случае успеха, возвращает - /// его, результат, в противном случае бросает исключение. - /// </summary> - /// <remarks> - /// <para> - /// Если ожидание обещания было прервано по таймауту, это не значит, - /// что обещание было отменено или что-то в этом роде, это только - /// означает, что мы его не дождались, однако все зарегистрированные - /// обработчики, как были так остались и они будут вызваны, когда - /// обещание будет выполнено. - /// </para> - /// <para> - /// Такое поведение вполне оправдано поскольку таймаут может истечь - /// в тот момент, когда началась обработка цепочки обработчиков, и - /// к тому же текущее обещание может стоять в цепочке обещаний и его - /// отклонение может привести к непрогнозируемому результату. - /// </para> - /// </remarks> - /// <param name="timeout">Время ожидания</param> - /// <returns>Результат выполнения обещания</returns> - public T Join(int timeout) { - var evt = new ManualResetEvent(false); - Anyway(() => evt.Set()); - - if (!evt.WaitOne(timeout, true)) - throw new TimeoutException(); - - switch (m_state) { - case SUCCEEDED_STATE: - return m_result; - case CANCELLED_STATE: - throw new OperationCanceledException(); - case REJECTED_STATE: - throw new TargetInvocationException(m_error); - default: - throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); - } - } - - public T Join() { - return Join(Timeout.Infinite); - } - - void AddMappers<T2>(Func<T,T2> success, Func<Exception,T2> error, Action cancel, Promise<T2> medium, bool inc) { - if (inc) - Interlocked.Increment(ref m_childrenCount); - - AbstractHandler handler = new RemapDescriptor<T2>(success, error, cancel, medium); - - bool queued; - - if (!IsResolved) { - m_handlers.Enqueue(handler); - queued = true; - } else { - // the promise is in resolved state, just invoke the handled with minimum overhead - queued = false; - InvokeHandler(handler); - } - - if (queued && IsResolved && m_handlers.TryDequeue(out handler)) - // if the promise have been resolved while we was adding handler to the queue - // we can't guarantee that someone is still processing it - // therefore we will fetch a handler from the queue and execute it - // note that fetched handler may be not the one that we have added - // even we can fetch no handlers at all :) - InvokeHandler(handler); - } - - void AddHandler(Action<T> success, Action<Exception> error, Action cancel, Promise<T> medium, bool inc) { - if (inc) - Interlocked.Increment(ref m_childrenCount); - - AbstractHandler handler = new HandlerDescriptor(success, error, cancel, medium); - - bool queued; - - if (!IsResolved) { - m_handlers.Enqueue(handler); - queued = true; - } else { - // the promise is in resolved state, just invoke the handled with minimum overhead - queued = false; - InvokeHandler(handler); - } - - if (queued && IsResolved && m_handlers.TryDequeue(out handler)) - // if the promise have been resolved while we was adding handler to the queue - // we can't guarantee that someone is still processing it - // therefore we will fetch a handler from the queue and execute it - // note that fetched handler may be not the one that we have added - // even we can fetch no handlers at all :) - InvokeHandler(handler); - } - - protected virtual void InvokeHandler(AbstractHandler handler) { - switch (m_state) { - case SUCCEEDED_STATE: - handler.Resolve(m_result); - break; - case REJECTED_STATE: - handler.Reject(m_error); - break; - case CANCELLED_STATE: - handler.Cancel(); - break; - default: - // do nothing - return; - } - } - - void OnStateChanged() { - AbstractHandler handler; - while (m_handlers.TryDequeue(out handler)) - InvokeHandler(handler); - } - - public bool IsExclusive { - get { - return m_childrenCount <= 1; - } - } - - /// <summary> - /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний. - /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено. - /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан. - /// </summary> - /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param> - /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns> - /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception> - public static IPromise<T[]> CreateComposite(IList<IPromise<T>> promises) { - if (promises == null) - throw new ArgumentNullException(); - - // создаем аккумулятор для результатов и результирующее обещание - var result = new T[promises.Count]; - var promise = new Promise<T[]>(); - - // special case - if (promises.Count == 0) { - promise.Resolve(result); - return promise; - } - - int pending = promises.Count; - - for (int i = 0; i < promises.Count; i++) { - var dest = i; - - if (promises[i] != null) { - promises[i].On( - x => { - result[dest] = x; - if (Interlocked.Decrement(ref pending) == 0) - promise.Resolve(result); - }, - promise.Reject - ); - } else { - if (Interlocked.Decrement(ref pending) == 0) - promise.Resolve(result); - } - } - - promise.Cancelled( - () => { - foreach (var d in promises) - if (d != null && d.IsExclusive) - d.Cancel(); - } - ); - - return promise; - } - - /// <summary> - /// Объединяет несколько обещаний в одно. Результирующее обещание будет выполнено при - /// выполнении всех указанных обещаний. При этом возвращаемые значения первичных обещаний - /// игнорируются. - /// </summary> - /// <param name="promises">Коллекция первичных обещаний, которые будут объеденены в одно.</param> - /// <returns>Новое обещание, объединяющее в себе переданные.</returns> - /// <remarks> - /// Если в коллекции встречаюься <c>null</c>, то они воспринимаются как выполненные обещания. - /// </remarks> - public static IPromise CreateComposite(ICollection<IPromise> promises) { - if (promises == null) - throw new ArgumentNullException(); - if (promises.Count == 0) - return Promise<object>.ResultToPromise(null); - - int countdown = promises.Count; - - var result = new Promise<object>(); - - foreach (var d in promises) { - if (d == null) { - if (Interlocked.Decrement(ref countdown) == 0) - result.Resolve(null); - } else { - d.Then(() => { - if (Interlocked.Decrement(ref countdown) == 0) - result.Resolve(null); - }); - } - } - - result.Cancelled(() => { - foreach (var d in promises) - if (d != null && d.IsExclusive) - d.Cancel(); - }); - - return result; - } - - public static Promise<T> ResultToPromise(T result) { - var p = new Promise<T>(); - p.Resolve(result); - return p; - } - - public static Promise<T> ExceptionToPromise(Exception error) { - if (error == null) - throw new ArgumentNullException(); - - var p = new Promise<T>(); - p.Reject(error); - return p; - } - - #region IPromiseBase explicit implementation - - IPromise IPromise.Then(Action success, Action<Exception> error, Action cancel) { - return Then( - success != null ? new Func<T,T>(x => { - success(); - return x; - }) : null, - error != null ? new Func<Exception,T>(e => { - error(e); - return default(T); - }) : null, - cancel - ); - } - - IPromise IPromise.Then(Action success, Action<Exception> error) { - return Then( - success != null ? new Func<T,T>(x => { - success(); - return x; - }) : null, - error != null ? new Func<Exception,T>(e => { - error(e); - return default(T); - }) : null - ); - } - - IPromise IPromise.Then(Action success) { - Safe.ArgumentNotNull(success, "success"); - return Then(x => { - success(); - return x; - }); - } - - IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) { - return ChainNoResult(chained, error, cancel); - } - - IPromise ChainNoResult(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) { - Safe.ArgumentNotNull(chained, "chained"); - - var medium = new Promise<object>(this); - - Func<T,T> resultHandler = delegate { - if (medium.IsCancelled) - return default(T); - - var promise = chained(); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.Cancelled(() => { - if (promise.IsExclusive) - promise.Cancel(); - }); - - return default(T); - }; - - Func<Exception,T> errorHandler; - - if (error != null) - errorHandler = delegate(Exception e) { - try { - var promise = error(e); - - promise.On( - medium.Resolve, - medium.Reject, - () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка - ); - - // notify chained operation that it's not needed anymore - // порядок вызова Then, Cancelled важен, поскольку от этого - // зависит IsExclusive - medium.Cancelled(() => { - if (promise.IsExclusive) - promise.Cancel(); - }); - } catch (Exception e2) { - medium.Reject(e2); - } - return default(T); - }; - else - errorHandler = err => { - medium.Reject(err); - return default(T); - }; - - - Action cancelHandler; - if (cancel != null) - cancelHandler = () => { - if (cancel != null) - cancel(); - medium.Cancel(); - }; - else - cancelHandler = medium.Cancel; - - AddMappers( - resultHandler, - errorHandler, - cancelHandler, - null, - true - ); - - return medium; - } - - IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error) { - return ChainNoResult(chained, error, null); - } - - IPromise IPromise.Chain(Func<IPromise> chained) { - return ChainNoResult(chained, null, null); - } - - - void IPromise.On(Action success, Action<Exception> error, Action cancel) { - On(success != null ? new Action<T>(x => success()) : null, error, cancel); - } - - void IPromise.On(Action success, Action<Exception> error) { - On(x => success(), error, null); - } - - void IPromise.On(Action success) { - On(x => success(), null, null); - } - - IPromise IPromise.Error(Action<Exception> error) { - return Error(error); - } - - IPromise IPromise.Anyway(Action handler) { - return Anyway(handler); - } - - IPromise IPromise.Cancelled(Action handler) { - return Cancelled(handler); - } - - void IPromise.Join() { - Join(); - } - - void IPromise.Join(int timeout) { - Join(timeout); - } - - #endregion - - - - } -} +using System; +using System.Diagnostics; + +namespace Implab { + public class Promise : AbstractPromise<Promise.HandlerDescriptor>, IPromise, IDeferred { + + public struct HandlerDescriptor { + readonly Action m_success; + readonly Action<Exception> m_error; + readonly Action m_cancel; + readonly IDeferred m_deferred; + + public HandlerDescriptor(Action success, Action<Exception> error, Action cancel, IDeferred deferred) { + m_success = success; + m_error = error; + m_cancel = cancel; + m_deferred = deferred; + } + + public void SignalSuccess() { + if (m_success != null) { + try { + m_success(); + if (m_deferred != null) + m_deferred.Resolve(); + } catch (Exception err) { + SignalError(err); + } + } + } + + public void SignalError(Exception err) { + if (m_error != null) { + try { + m_error(err); + if (m_deferred != null) + m_deferred.Resolve(); + } catch (Exception err2) { + if (m_deferred != null) + m_deferred.Reject(err2); + } + } else { + if (m_deferred != null) + m_deferred.Reject(err); + } + } + + public void SignalCancel() { + if (m_cancel != null) { + try { + m_cancel(); + if (m_deferred != null) + m_deferred.Resolve(); + } catch (Exception err) { + SignalError(err); + } + } else { + if (m_deferred != null) + m_deferred.Cancel(); + } + } + } + + public void Resolve() { + BeginSetResult(); + EndSetResult(); + } + + public void Reject(Exception error) { + SetError(error); + } + + #region implemented abstract members of AbstractPromise + + protected override void SignalSuccess(HandlerDescriptor handler) { + handler.SignalSuccess(); + } + + protected override void SignalError(HandlerDescriptor handler, Exception error) { + handler.SignalError(error); + } + + protected override void SignalCancelled(HandlerDescriptor handler) { + handler.SignalCancel(); + } + + protected override void Listen(PromiseEventType events, Action handler) { + AddHandler(new HandlerDescriptor( + events.HasFlag(PromiseEventType.Success) ? handler : null, + events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null, + events.HasFlag(PromiseEventType.Cancelled) ? handler : null, + null + )); + } + + #endregion + + + public Type PromiseType { + get { + return typeof(void); + } + } + + public IPromise Then(Action success, Action<Exception> error, Action cancel) { + var promise = new Promise(); + if (success != null) + promise.On(Cancel, PromiseEventType.Cancelled); + + AddHandler(new HandlerDescriptor(success, error, cancel, promise)); + + return promise; + } + + public IPromise Then(Action success, Action<Exception> error) { + return Then(success, error, null); + } + + public IPromise Then(Action success) { + return Then(success, null, null); + } + + public IPromise On(Action success, Action<Exception> error, Action cancel) { + AddHandler(new HandlerDescriptor(success, error, cancel, null)); + return this; + } + + public IPromise On(Action success, Action<Exception> error) { + return On(success, error, null); + } + + public IPromise On(Action success) { + return On(success, null, null); + } + + public IPromise On(Action handler, PromiseEventType events) { + return On( + events.HasFlag(PromiseEventType.Success) ? handler : null, + events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null, + events.HasFlag(PromiseEventType.Cancelled) ? handler : null + ); + } + + public IPromise<T> Cast<T>() { + throw new InvalidCastException(); + } + + public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) { + var medium = new Promise(); + + On( + () => { + if (medium.IsCancelled) + return; + if (chained != null) + ConnectPromise(chained(), medium); + }, + ex => { + if (medium.IsCancelled) + return; + if (error != null) { + try { + ConnectPromise(error(ex), medium); + } catch (Exception ex2) { + medium.Reject(ex2); + } + } else { + medium.Reject(ex); + } + }, + () => { + if (medium.IsCancelled) + return; + if (cancel != null) + ConnectPromise(cancel(), medium); + else + medium.Cancel(); + } + ); + + if (chained != null) + medium.On(Cancel, PromiseEventType.Cancelled); + + return medium; + } + + static void ConnectPromise(IPromise result, Promise medium) { + if (result != null) { + result.On( + medium.Resolve, + medium.Reject, + () => medium.Reject(new OperationCanceledException()) + ); + medium.On(result.Cancel, PromiseEventType.Cancelled); + } else { + medium.Reject( + new NullReferenceException( + "The chained asynchronous operation returned" + + " 'null' where the promise instance is expected" + ) + ); + } + } + + public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) { + return Chain(chained, error, null); + } + + public IPromise Chain(Func<IPromise> chained) { + return Chain(chained, null, null); + } + + public IPromise Error(Action<Exception> error) { + var promise = new Promise(); + On( + null, + err => { + if (error != null) + try { + error(err); + promise.Resolve(); + } catch (Exception err2) { + promise.Reject(err2); + } + else + promise.Reject(err); + } + ); + + return promise; + } + + public IPromise Cancelled(Action handler) { + var promise = new Promise(); + On( + null, + null, + () => { + if (handler != null) { + try { + handler(); + promise.Resolve(); + } catch (Exception err) { + promise.Reject(err); + } + } else { + promise.Cancel(); + } + } + ); + + return promise; + } + + + } +} +
--- a/Implab/PromiseExtensions.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/PromiseExtensions.cs Sun Jan 11 19:13:02 2015 +0300 @@ -1,6 +1,7 @@ using System.Threading; using System; using Implab.Diagnostics; +using System.Collections.Generic; #if NET_4_5 @@ -15,7 +16,8 @@ if (context == null) return that; - var p = new SyncContextPromise<T>(context, that); + var p = new SyncContextPromise<T>(context); + p.On(that.Cancel, PromiseEventType.Cancelled); that.On( p.Resolve, @@ -29,7 +31,9 @@ Safe.ArgumentNotNull(that, "that"); Safe.ArgumentNotNull(context, "context"); - var p = new SyncContextPromise<T>(context, that); + var p = new SyncContextPromise<T>(context); + p.On(that.Cancel, PromiseEventType.Cancelled); + that.On( p.Resolve, @@ -89,6 +93,29 @@ that.On(timer.Dispose, PromiseEventType.All); return that; } + + public static IPromise Combine(this ICollection<IPromise> that) { + Safe.ArgumentNotNull(that, "that"); + + int count = that.Count; + var medium = new Promise(); + + foreach (var p in that) + p.On( + () => { + if (Interlocked.Decrement(ref count) == 0) + medium.Resolve(); + }, + error => { + throw new Exception("The dependency promise is failed", error); + }, + () => { + throw new OperationCanceledException("The dependency promise is cancelled"); + } + ); + + return medium; + } #if NET_4_5
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseT.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,621 @@ +using System; +using System.Diagnostics; + +namespace Implab { + + /// <summary> + /// Класс для асинхронного получения результатов. Так называемое "обещание". + /// </summary> + /// <typeparam name="T">Тип получаемого результата</typeparam> + /// <remarks> + /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции, + /// клиент получив такое обещание может установить ряд обратных вызово для получения + /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para> + /// <para> + /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на + /// данные события клиент должен использовать методы <c>Then</c>. + /// </para> + /// <para> + /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой), + /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о + /// выполнении обещания. + /// </para> + /// <para> + /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался, + /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном + /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в + /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении + /// обещания. + /// </para> + /// <para> + /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать + /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует + /// использовать соответствующую форму методе <c>Then</c>. + /// </para> + /// <para> + /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать + /// только инициатор обещания иначе могут возникнуть противоречия. + /// </para> + /// </remarks> + public class Promise<T> : AbstractPromise<IDeferred<T>>, IPromise<T>, IDeferred<T> { + + class StubDeferred : IDeferred<T> { + public static readonly StubDeferred instance = new StubDeferred(); + + StubDeferred() { + } + + #region IDeferred implementation + + public void Resolve(T value) { + } + + public void Reject(Exception error) { + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + } + + #endregion + + + } + + class RemapDescriptor<T2> : IDeferred<T> { + readonly Func<T,T2> m_remap; + readonly Func<Exception,T2> m_failed; + readonly Func<T2> m_cancel; + readonly IDeferred<T2> m_deferred; + + public RemapDescriptor(Func<T,T2> remap, Func<Exception,T2> failed, Func<T2> cancel, IDeferred<T2> deferred ) { + Debug.Assert(deferred != null); + m_remap = remap; + m_failed = failed; + m_cancel = cancel; + m_deferred = deferred; + } + + + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_remap != null) { + try { + m_deferred.Resolve(m_remap(value)); + } catch (Exception ex) { + Reject(ex); + } + } + } + + public void Reject(Exception error) { + if (m_failed != null) { + try { + m_deferred.Resolve(m_failed(error)); + } catch (Exception ex) { + m_deferred.Reject(ex); + } + } else { + m_deferred.Reject(error); + } + } + + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_cancel != null) { + try { + m_deferred.Resolve(m_cancel()); + } catch (Exception ex) { + Reject(ex); + } + } else { + m_deferred.Cancel(); + } + } + + #endregion + } + + class ListenerDescriptor : IDeferred<T> { + readonly Action m_handler; + readonly PromiseEventType m_events; + + public ListenerDescriptor(Action handler, PromiseEventType events) { + Debug.Assert(handler != null); + + m_handler = handler; + m_events = events; + } + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_events.HasFlag(PromiseEventType.Success)) { + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + + public void Reject(Exception error) { + if (m_events.HasFlag(PromiseEventType.Error)){ + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_events.HasFlag(PromiseEventType.Cancelled)){ + try { + m_handler(); + // Analysis disable once EmptyGeneralCatchClause + } catch { + } + } + } + + #endregion + } + + class ValueEventDescriptor : IDeferred<T> { + readonly Action<T> m_success; + readonly Action<Exception> m_failed; + readonly Action m_cancelled; + readonly IDeferred<T> m_deferred; + + public ValueEventDescriptor(Action<T> success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) { + Debug.Assert(deferred != null); + + m_success = success; + m_failed = failed; + m_cancelled = cancelled; + m_deferred = deferred; + } + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_success != null) { + try { + m_success(value); + m_deferred.Resolve(value); + } catch (Exception ex) { + Reject(ex); + } + } + } + + public void Reject(Exception error) { + if (m_failed != null) { + try { + m_failed(error); + m_deferred.Resolve(default(T)); + } catch(Exception ex) { + m_deferred.Reject(ex); + } + } else { + m_deferred.Reject(error); + } + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_cancelled != null) { + try { + m_cancelled(); + m_deferred.Resolve(default(T)); + } catch(Exception ex) { + Reject(ex); + } + } else { + m_deferred.Cancel(); + } + } + + #endregion + } + + public class EventDescriptor : IDeferred<T> { + readonly Action m_success; + readonly Action<Exception> m_failed; + readonly Action m_cancelled; + readonly IDeferred<T> m_deferred; + + public EventDescriptor(Action success, Action<Exception> failed, Action cancelled, IDeferred<T> deferred) { + Debug.Assert(deferred != null); + + m_success = success; + m_failed = failed; + m_cancelled = cancelled; + m_deferred = deferred; + } + + #region IDeferred implementation + + public void Resolve(T value) { + if (m_success != null) { + try { + m_success(); + m_deferred.Resolve(value); + } catch (Exception ex) { + Reject(ex); + } + } + } + + public void Reject(Exception error) { + if (m_failed != null) { + try { + m_failed(error); + m_deferred.Resolve(default(T)); + }catch (Exception ex) + { + m_deferred.Reject(ex); + } + } else { + m_deferred.Reject(error); + } + + } + + #endregion + + #region ICancellable implementation + + public void Cancel() { + if (m_cancelled != null) { + try { + m_cancelled(); + m_deferred.Resolve(default(T)); + } catch (Exception ex) { + Reject(ex); + } + } else { + m_deferred.Cancel(); + } + } + + #endregion + } + + T m_result; + + public virtual void Resolve(T value) { + BeginSetResult(); + m_result = value; + EndSetResult(); + } + + public void Reject(Exception error) { + SetError(error); + } + + public Type PromiseType { + get { + return typeof(T); + } + } + + public new T Join() { + WaitResult(-1); + return m_result; + } + public new T Join(int timeout) { + WaitResult(timeout); + return m_result; + } + + public IPromise<T> On(Action<T> success, Action<Exception> error, Action cancel) { + AddHandler(new ValueEventDescriptor(success, error, cancel, StubDeferred.instance)); + return this; + } + + public IPromise<T> On(Action<T> success, Action<Exception> error) { + AddHandler(new ValueEventDescriptor(success, error, null, StubDeferred.instance)); + return this; + } + + public IPromise<T> On(Action<T> success) { + AddHandler(new ValueEventDescriptor(success, null, null, StubDeferred.instance)); + return this; + } + + public IPromise<T> On(Action handler, PromiseEventType events) { + Listen(events, handler); + return this; + } + + public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error, Func<T2> cancel) { + var promise = new Promise<T2>(); + AddHandler(new RemapDescriptor<T2>(mapper, error, cancel, promise)); + return promise; + } + + public IPromise<T2> Then<T2>(Func<T, T2> mapper, Func<Exception, T2> error) { + var promise = new Promise<T2>(); + AddHandler(new RemapDescriptor<T2>(mapper, error, null, promise)); + return promise; + } + + public IPromise<T2> Then<T2>(Func<T, T2> mapper) { + var promise = new Promise<T2>(); + AddHandler(new RemapDescriptor<T2>(mapper, null, null, promise)); + return promise; + } + + public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error, Func<IPromise<T2>> cancel) { + // this promise will be resolved when an asyc operation is started + var promise = new Promise<IPromise<T2>>(); + + AddHandler(new RemapDescriptor<IPromise<T2>>( + chained, + error, + cancel, + promise + )); + + var medium = new Promise<T2>(); + + if (chained != null) + medium.On(Cancel, PromiseEventType.Cancelled); + + // we need to connect started async operation with the medium + // if the async operation hasn't been started by the some reason + // report is to the medium + promise.On( + result => ConnectPromise<T2>(result, medium), + medium.Reject, + medium.Cancel + ); + + return medium; + } + + static void ConnectPromise<T2>(IPromise<T2> result, Promise<T2> medium) { + if (result != null) { + result.On( + medium.Resolve, + medium.Reject, + () => medium.Reject(new OperationCanceledException()) + ); + medium.On(result.Cancel, PromiseEventType.Cancelled); + } else { + medium.Reject( + new NullReferenceException( + "The chained asynchronous operation returned" + + " 'null' where the promise instance is expected" + ) + ); + } + } + + public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained, Func<Exception, IPromise<T2>> error) { + return Chain(chained, error, null); + } + + public IPromise<T2> Chain<T2>(Func<T, IPromise<T2>> chained) { + return Chain(chained, null, null); + } + + public IPromise<T2> Error<T2>(Func<Exception, T2> error) { + var promise = new Promise<T2>(); + if (error != null) + On( + (Action<T>)null, + ex => { + try { + promise.Resolve(error(ex)); + } catch (Exception ex2) { + promise.Reject(ex2); + } + } + ); + else + Listen(PromiseEventType.Error, () => promise.Resolve(default(T2))); + return promise; + } + + public IPromise<T2> Cancelled<T2>(Func<T2> handler) { + var promise = new Promise<T2>(); + if (handler != null) + On( + (Action<T>)null, + null, + () => { + try { + promise.Resolve(handler()); + } catch (Exception ex) { + promise.Reject(ex); + } + }); + else + Listen(PromiseEventType.Cancelled, () => promise.Resolve(default(T2))); + return promise; + } + + public IPromise Then(Action success, Action<Exception> error, Action cancel) { + var promise = new Promise<T>(); + if (success != null) + promise.On(Cancel, PromiseEventType.Cancelled); + + AddHandler(new EventDescriptor(success, error, cancel, promise)); + + return promise; + } + + public IPromise Then(Action success, Action<Exception> error) { + return Then(success, error, null); + } + + public IPromise Then(Action success) { + return Then(success, null, null); + } + + public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) { + var promise = new Promise<IPromise>(); + + AddHandler( + new RemapDescriptor<IPromise>( + x => chained(), + error, + cancel, + promise + ) + ); + + var medium = new Promise(); + if (chained != null) + medium.On(Cancel, PromiseEventType.Cancelled); + + promise.On( + result => ConnectPromise(result, medium), + medium.Reject, + medium.Cancel + ); + + return medium; + } + + static void ConnectPromise(IPromise result, Promise medium) { + if (result != null) { + result.On( + medium.Resolve, + medium.Reject, + () => medium.Reject(new OperationCanceledException()) + ); + medium.On(result.Cancel, PromiseEventType.Cancelled); + } else { + medium.Reject( + new NullReferenceException( + "The chained asynchronous operation returned" + + " 'null' where the promise instance is expected" + ) + ); + } + } + + public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) { + return Chain(chained, error, null); + } + + public IPromise Chain(Func<IPromise> chained) { + return Chain(chained, null, null); + } + + public IPromise On(Action success, Action<Exception> error, Action cancel) { + AddHandler(new EventDescriptor(success,error,cancel, StubDeferred.instance)); + return this; + } + + public IPromise On(Action success, Action<Exception> error) { + AddHandler(new EventDescriptor(success, error, null, StubDeferred.instance)); + return this; + } + + public IPromise On(Action success) { + Listen(PromiseEventType.Success, success); + return this; + } + + IPromise IPromise.On(Action handler, PromiseEventType events) { + Listen(events,handler); + return this; + } + + public IPromise Error(Action<Exception> error) { + var promise = new Promise(); + if (error != null) + On( + (Action<T>)null, + ex => { + try { + error(ex); + promise.Resolve(); + } catch (Exception ex2) { + promise.Reject(ex2); + } + }); + else + Listen(PromiseEventType.Error, promise.Resolve); + return promise; + } + + public IPromise Cancelled(Action handler) { + var promise = new Promise(); + if (handler != null) + On( + (Action<T>)null, + null, + () => { + try { + handler(); + promise.Resolve(); + } catch (Exception ex) { + promise.Reject(ex); + } + }); + else + Listen(PromiseEventType.Cancelled, promise.Resolve); + return promise; + } + + public IPromise<T2> Cast<T2>() { + return (IPromise<T2>)this; + } + + #region implemented abstract members of AbstractPromise + + protected override void SignalSuccess(IDeferred<T> handler) { + handler.Resolve(m_result); + } + + protected override void SignalError(IDeferred<T> handler, Exception error) { + handler.Reject(error); + } + + protected override void SignalCancelled(IDeferred<T> handler) { + handler.Cancel(); + } + + protected override void Listen(PromiseEventType events, Action handler) { + if (handler != null) + AddHandler(new ListenerDescriptor(handler, events)); + } + + #endregion + + public static IPromise<T> ResultToPromise(T value) { + var p = new Promise<T>(); + p.Resolve(value); + return p; + } + + public static IPromise<T> ExceptionToPromise(Exception error) { + var p = new Promise<T>(); + p.Reject(error); + return p; + } + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseTransientException.cs Sun Jan 11 19:13:02 2015 +0300 @@ -0,0 +1,33 @@ +using System; + +namespace Implab { + + [Serializable] + public class PromiseTransientException : Exception { + /// <summary> + /// Initializes a new instance of the <see cref="PromiseTransientException"/> class. + /// </summary> + /// <param name="inner">The exception that is the cause of the current exception.</param> + public PromiseTransientException(Exception inner) : base("The preceding promise has failed", inner) { + } + + /// <summary> + /// Initializes a new instance of the <see cref="PromiseTransientException"/> class + /// </summary> + /// <param name="message">A <see cref="T:System.String"/> that describes the exception. </param> + /// <param name="inner">The exception that is the cause of the current exception. </param> + public PromiseTransientException(string message, Exception inner) + : base(message, inner) { + } + + /// <summary> + /// Initializes a new instance of the <see cref="PromiseTransientException"/> class + /// </summary> + /// <param name="context">The contextual information about the source or destination.</param> + /// <param name="info">The object that holds the serialized object data.</param> + protected PromiseTransientException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) + : base(info, context) { + } + } +} +
--- a/Implab/Safe.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/Safe.cs Sun Jan 11 19:13:02 2015 +0300 @@ -23,7 +23,7 @@ public static void ArgumentNotEmpty<T>(T[] param, string name) { if (param == null || param.Length == 0) - throw new ArgumentException("The array must be not emty"); + throw new ArgumentException("The array must be not emty", name); } public static void ArgumentNotNull(object param, string name) { @@ -61,7 +61,7 @@ public static IPromise InvokePromise(Action action) { ArgumentNotNull(action, "action"); - var p = new Promise<object>(); + var p = new Promise(); try { action(); p.Resolve();
--- a/Implab/SyncContextPromise.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/Implab/SyncContextPromise.cs Sun Jan 11 19:13:02 2015 +0300 @@ -9,13 +9,16 @@ m_context = context; } - public SyncContextPromise(SynchronizationContext context, IPromise parent) - : base(parent) { - Safe.ArgumentNotNull(context, "context"); - m_context = context; + protected override void SignalSuccess(IDeferred<T> handler) { + m_context.Post(x => base.SignalSuccess(handler), null); } - protected override void InvokeHandler(AbstractHandler handler) { - m_context.Post(x => base.InvokeHandler(handler),null); + + protected override void SignalError(IDeferred<T> handler, System.Exception error) { + m_context.Post(x => base.SignalError(handler, error), null); + } + + protected override void SignalCancelled(IDeferred<T> handler) { + m_context.Post(x => base.SignalCancelled(handler), null); } } }
--- a/Implab/TransientPromiseException.cs Sun Dec 28 16:09:03 2014 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,33 +0,0 @@ -using System; - -namespace Implab { - - [Serializable] - public class TransientPromiseException : Exception { - /// <summary> - /// Initializes a new instance of the <see cref="PromiseFailedException"/> class. - /// </summary> - /// <param name="inner">The exception that is the cause of the current exception.</param> - public TransientPromiseException(Exception inner) : base("The preceding promise has failed", inner) { - } - - /// <summary> - /// Initializes a new instance of the <see cref="PromiseFailedException"/> class - /// </summary> - /// <param name="message">A <see cref="T:System.String"/> that describes the exception. </param> - /// <param name="inner">The exception that is the cause of the current exception. </param> - public TransientPromiseException(string message, Exception inner) - : base(message, inner) { - } - - /// <summary> - /// Initializes a new instance of the <see cref="PromiseFailedException"/> class - /// </summary> - /// <param name="context">The contextual information about the source or destination.</param> - /// <param name="info">The object that holds the serialized object data.</param> - protected TransientPromiseException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) - : base(info, context) { - } - } -} -
--- a/MonoPlay/Program.cs Sun Dec 28 16:09:03 2014 +0300 +++ b/MonoPlay/Program.cs Sun Jan 11 19:13:02 2015 +0300 @@ -11,7 +11,7 @@ if (args == null) throw new ArgumentNullException("args"); - var q1 = new MTQueue<int>(); + var q1 = new AsyncQueue<int>(); var q2 = new Queue<int>(); const int count = 10000000; @@ -19,60 +19,91 @@ var t1 = Environment.TickCount; - Promise<int>.CreateComposite( - new [] { - AsyncPool.InvokeNewThread(() => { - for (var i = 0; i < count; i++) - q1.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { - int temp = 0; - for(int i =0 ; i< count ; i++) - while(!q1.TryDequeue(out temp)){ - } - }) - } - ).Join(); + new [] { + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + q1.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + q1.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + int temp = 0; + int i = 0; + while (i < count) + if (q1.TryDequeue(out temp)) + i++; + }), + AsyncPool.InvokeNewThread(() => { + int temp = 0; + int i = 0; + while (i < count) + if (q1.TryDequeue(out temp)) + i++; + }) + } + .Combine() + .Join(); var t2 = Environment.TickCount; Console.WriteLine("MTQueue: {0} ms", t2 - t1); t1 = Environment.TickCount; - for (var i = 0; i < count; i++) + for (var i = 0; i < count * 2; i++) q2.Enqueue(i); + for (var i = 0; i < count * 2; i++) + q2.Dequeue(); + t2 = Environment.TickCount; - Console.WriteLine("LinkedList: {0} ms", t2 - t1); + Console.WriteLine("Queue: {0} ms", t2 - t1); q2 = new Queue<int>(); t1 = Environment.TickCount; - Promise<int>.CreateComposite( - new [] { - AsyncPool.InvokeNewThread(() => { - for (var i = 0; i < count; i++) - lock (q2) - q2.Enqueue(i); - }), - AsyncPool.InvokeNewThread(() => { - for(int i = 0 ; i< count ;) - lock(q2) { - if(q2.Count == 0) - continue; - q2.Dequeue(); - i++; - } + + new [] { + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + lock (q2) + q2.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + for (var i = 0; i < count; i++) + lock (q2) + q2.Enqueue(i); + }), + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < count ;) + lock (q2) { + if (q2.Count == 0) + continue; + q2.Dequeue(); + i++; + } - }) - } - ).Join(); + }), + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < count ;) + lock (q2) { + if (q2.Count == 0) + continue; + q2.Dequeue(); + i++; + } + + }) + } + .Combine() + .Join(); t2 = Environment.TickCount; - Console.WriteLine("LinkedList+Lock: {0} ms", t2 - t1); + Console.WriteLine("Queue+Lock: {0} ms", t2 - t1); } }