view Implab/Parallels/AsyncPool.cs @ 203:4d9830a9bbb8 v2

Added 'Fail' method to RunnableComponent which allows component to move from Running to Failed state. Added PollingComponent a timer based runnable component More tests Added FailPromise a thin class to wrap exceptions Fixed error handling in SuccessPromise classes.
author cin
date Tue, 18 Oct 2016 17:49:54 +0300
parents 706fccb85524
children
line wrap: on
line source

using Implab.Diagnostics;
using System;
using System.Threading;
using System.Linq;

namespace Implab.Parallels {
	/// <summary>
	/// Класс для распаралеливания задач.
	/// </summary>
	/// <remarks>
	/// Используя данный класс и лямда выражения можно распараллелить
	/// вычисления, для этого используется концепция обещаний.
	/// </remarks>
	public static class AsyncPool {

		public static IPromise<T> Invoke<T>(Func<T> func) {
			var p = new Promise<T>();
            var caller = TraceContext.Instance.CurrentOperation;

			ThreadPool.QueueUserWorkItem(param => {
                TraceContext.Instance.EnterLogicalOperation(caller,false);
				try {
					p.Resolve(func());
				} catch(Exception e) {
					p.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
			});

			return p;
		}

        public static IPromise<T> Invoke<T>(Func<ICancellationToken, T> func) {
            var p = new Promise<T>();
            var caller = TraceContext.Instance.CurrentOperation;

            ThreadPool.QueueUserWorkItem(param => {
                TraceContext.Instance.EnterLogicalOperation(caller,false);
                try {
                    p.Resolve(func(p));
                } catch(Exception e) {
                    p.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
            });

            return p;
        }

        public static IPromise<T> RunThread<T>(Func<T> func) {
            var p = new Promise<T>();

            var caller = TraceContext.Instance.CurrentOperation;

            var worker = new Thread(() => {
                TraceContext.Instance.EnterLogicalOperation(caller,false);
                try {
                    p.Resolve(func());
                } catch (Exception e) {
                    p.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
            });
            worker.IsBackground = true;
            worker.Start();

            return p;
        }

        public static IPromise<T> RunThread<T>(Func<ICancellationToken, T> func) {
            var p = new Promise<T>();

            var caller = TraceContext.Instance.CurrentOperation;

            var worker = new Thread(() => {
                TraceContext.Instance.EnterLogicalOperation(caller,false);
                try {
                    p.Resolve(func(p));
                } catch (Exception e) {
                    p.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
            });
            worker.IsBackground = true;
            worker.Start();

            return p;
        }


        public static IPromise RunThread(Action func) {
            var p = new Promise();

            var caller = TraceContext.Instance.CurrentOperation;

            var worker = new Thread(() => {
                TraceContext.Instance.EnterLogicalOperation(caller,false);
                try {
                    func();
                    p.Resolve();
                } catch (Exception e) {
                    p.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
            });
            worker.IsBackground = true;
            worker.Start();

            return p;
        }

        public static IPromise RunThread(Action<ICancellationToken> func) {
            var p = new Promise();

            var caller = TraceContext.Instance.CurrentOperation;

            var worker = new Thread(() => {
                TraceContext.Instance.EnterLogicalOperation(caller,false);
                try {
                    func(p);
                    p.Resolve();
                } catch (Exception e) {
                    p.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
            });
            worker.IsBackground = true;
            worker.Start();

            return p;
        }

        public static IPromise[] RunThread(params Action[] func) {
            return func.Select(f => RunThread(f)).ToArray();
        }

        public static IPromise[] RunThread(params Action<ICancellationToken>[] func) {
            return func.Select(f => RunThread(f)).ToArray();
        }

        public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
            return func.Select(f => RunThread(f)).ToArray();
        }

        public static IPromise<T>[] RunThread<T>(params Func<ICancellationToken, T>[] func) {
            return func.Select(f => RunThread(f)).ToArray();
        }
	}
}