view Implab/PromiseExtensions.cs @ 149:eb793fbbe4ea v2

fixed promises cancellation
author cin
date Wed, 06 May 2015 17:11:27 +0300
parents 706fccb85524
children ec91a6dfa5b3
line wrap: on
line source

using System.Threading;
using System;
using Implab.Diagnostics;
using System.Collections.Generic;


#if NET_4_5
using System.Threading.Tasks;
#endif

namespace Implab {
    public static class PromiseExtensions {
        public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) {
            Safe.ArgumentNotNull(that, "that");
            var context = SynchronizationContext.Current;
            if (context == null)
                return that;

            var p = new SyncContextPromise<T>(context);
            p.On(that.Cancel, PromiseEventType.Cancelled);

            that.On(
                p.Resolve,
                p.Reject,
                p.Cancel
            );
            return p;
        }

        public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) {
            Safe.ArgumentNotNull(that, "that");
            Safe.ArgumentNotNull(context, "context");

            var p = new SyncContextPromise<T>(context);
            p.On(that.Cancel, PromiseEventType.Cancelled);


            that.On(
                p.Resolve,
                p.Reject,
                p.Cancel
            );
            return p;
        }

        /// <summary>
        /// Ensures the dispatched.
        /// </summary>
        /// <returns>The dispatched.</returns>
        /// <param name="that">That.</param>
        /// <param name="head">Head.</param>
        /// <param name="cleanup">Cleanup.</param>
        /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
        /// <typeparam name="T">The 2nd type parameter.</typeparam>
        public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{
            Safe.ArgumentNotNull(that, "that");
            Safe.ArgumentNotNull(head, "head");

            that.On(() => head.On(cleanup), PromiseEventType.Cancelled);

            return that;
        }

        public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) {
            Safe.ArgumentNotNull(that, "that");
            Safe.ArgumentNotNull(callback, "callback");
            var op = TraceContext.Instance.CurrentOperation;
            return ar => {
                TraceContext.Instance.EnterLogicalOperation(op,false);
                try {
                    that.Resolve(callback(ar));
                } catch (Exception err) {
                    that.Reject(err);
                } finally {
                    TraceContext.Instance.Leave();
                }
            };
        }

        static void CancelCallback(object cookie) {
            ((ICancellable)cookie).Cancel();
        }

        /// <summary>
        /// Cancells promise after the specified timeout is elapsed.
        /// </summary>
        /// <param name="that">The promise to cancel on timeout.</param>
        /// <param name="milliseconds">The timeout in milliseconds.</param>
        /// <typeparam name="TPromise">The 1st type parameter.</typeparam>
        public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise {
            Safe.ArgumentNotNull(that, "that");
            var timer = new Timer(CancelCallback, that, milliseconds, -1);
            that.On(timer.Dispose, PromiseEventType.All);
            return that;
        }

        public static IPromise Bundle(this ICollection<IPromise> that) {
            Safe.ArgumentNotNull(that, "that");

            int count = that.Count;
            int errors = 0;
            var medium = new Promise();

            if (count == 0) {
                medium.Resolve();
                return medium;
            }

            medium.On(() => {
                foreach(var p2 in that)
                    p2.Cancel();
            }, PromiseEventType.ErrorOrCancel);

            foreach (var p in that)
                p.On(
                    () => {
                        if (Interlocked.Decrement(ref count) == 0)
                            medium.Resolve();
                    },
                    error => {
                        if (Interlocked.Increment(ref errors) == 1)
                            medium.Reject(
                                new Exception("The dependency promise is failed", error)
                            );
                    },
                    reason => {
                        if (Interlocked.Increment(ref errors) == 1)
                            medium.Cancel(
                                new Exception("The dependency promise is cancelled")
                            );
                    }
                );

            return medium;
        }

        public static IPromise<T[]> Bundle<T>(this ICollection<IPromise<T>> that) {
            Safe.ArgumentNotNull(that, "that");

            int count = that.Count;
            int errors = 0;
            var medium = new Promise<T[]>();
            var results = new T[that.Count];

            medium.On(() => {
                foreach(var p2 in that)
                    p2.Cancel();
            }, PromiseEventType.ErrorOrCancel);

            int i = 0;
            foreach (var p in that) {
                var idx = i;
                p.On(
                    x => {
                        results[idx] = x;
                        if (Interlocked.Decrement(ref count) == 0)
                            medium.Resolve(results);
                    },
                    error => {
                        if (Interlocked.Increment(ref errors) == 1)
                            medium.Reject(
                                new Exception("The dependency promise is failed", error)
                            );
                    },
                    reason => {
                        if (Interlocked.Increment(ref errors) == 1)
                            medium.Cancel(
                                new Exception("The dependency promise is cancelled", reason)
                            );
                    }
                );
                i++;
            }

            return medium;
        }

        public static IPromise Then(this IPromise that, Action success, Action<Exception> error, Action<Exception> cancel) {
            Safe.ArgumentNotNull(that, "that");

            var d = new ActionTask(success, error, cancel, false);
            that.On(d.Resolve, d.Reject, d.CancelOperation);
            if (success != null)
                d.CancellationRequested(that.Cancel);
            return d;
        }

        public static IPromise Then(this IPromise that, Action success, Action<Exception> error) {
            return Then(that, success, error, null);
        }

        public static IPromise Then(this IPromise that, Action success) {
            return Then(that, success, null, null);
        }

        public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error, Func<Exception, T> cancel) {
            Safe.ArgumentNotNull(that, "that");

            var d = new FuncTask<T>(success, error, cancel, false);
            that.On(d.Resolve, d.Reject, d.CancelOperation);
            if (success != null)
                d.CancellationRequested(that.Cancel);
            return d;
        }

        public static IPromise<T> Then<T>(this IPromise that, Func<T> success, Func<Exception, T> error) {
            return Then(that, success, error, null);
        }

        public static IPromise<T> Then<T>(this IPromise that, Func<T> success) {
            return Then(that, success, null, null);
        }

        public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error, Func<Exception, T2> cancel) {
            Safe.ArgumentNotNull(that, "that");
            var d = new FuncTask<T,T2>(success, error, cancel, false);
            that.On(d.Resolve, d.Reject, d.CancelOperation);
            if (success != null)
                d.CancellationRequested(that.Cancel);
            return d;
        }

        public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success, Func<Exception, T2> error) {
            return Then(that, success, error, null);
        }

        public static IPromise<T2> Then<T, T2>(this IPromise<T> that, Func<T, T2> success) {
            return Then(that, success, null, null);
        }

        #region chain traits
        public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error, Func<Exception,IPromise> cancel) {
            Safe.ArgumentNotNull(that, "that");

            var d = new ActionChainTask(success, error, cancel, false);
            that.On(d.Resolve, d.Reject, d.CancelOperation);
            if (success != null)
                d.CancellationRequested(that.Cancel);
            return d;
        }

        public static IPromise Chain(this IPromise that, Func<IPromise> success, Func<Exception,IPromise> error) {
            return Chain(that, success, error, null);
        }

        public static IPromise Chain(this IPromise that, Func<IPromise> success) {
            return Chain(that, success, null, null);
        }

        public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error, Func<Exception, IPromise<T>> cancel) {
            Safe.ArgumentNotNull(that, "that");

            var d = new FuncChainTask<T>(success, error, cancel, false);
            that.On(d.Resolve, d.Reject, d.CancelOperation);
            if (success != null)
                d.CancellationRequested(that.Cancel);
            return d;
        }

        public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success, Func<Exception, IPromise<T>> error) {
            return Chain(that, success, error, null);
        }

        public static IPromise<T> Chain<T>(this IPromise that, Func<IPromise<T>> success) {
            return Chain(that, success, null, null);
        }

        public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error, Func<Exception, IPromise<T2>> cancel) {
            Safe.ArgumentNotNull(that, "that");
            var d = new FuncChainTask<T,T2>(success, error, cancel, false);
            that.On(d.Resolve, d.Reject, d.CancelOperation);
            if (success != null)
                d.CancellationRequested(that.Cancel);
            return d;
        }

        public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success, Func<Exception, IPromise<T2>> error) {
            return Chain(that, success, error, null);
        }

        public static IPromise<T2> Chain<T, T2>(this IPromise<T> that, Func<T, IPromise<T2>> success) {
            return Chain(that, success, null, null);
        }

        #endregion

            
        #if NET_4_5

        public static Task<T> GetTask<T>(this IPromise<T> that) {
            Safe.ArgumentNotNull(that, "that");
            var tcs = new TaskCompletionSource<T>();

            that.On(tcs.SetResult, tcs.SetException, r => tcs.SetCanceled());

            return tcs.Task;
        }

        #endif
    }
}