view Implab/Parallels/WorkerPool.cs @ 166:b84cdbe82e7f ref20160224

sync
author cin
date Mon, 29 Feb 2016 18:41:01 +0300
parents eb793fbbe4ea
children
line wrap: on
line source

using System;
using System.Threading;
using System.Diagnostics;
using Implab.Diagnostics;

namespace Implab.Parallels {
    public class WorkerPool : DispatchPool<Action> {

        AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
        int m_queueLength;
        readonly int m_threshold = 1;

        public WorkerPool(int minThreads, int maxThreads, int threshold)
            : base(minThreads, maxThreads) {
            m_threshold = threshold;
            InitPool();
        }

        public WorkerPool(int minThreads, int maxThreads) :
            base(minThreads, maxThreads) {
            InitPool();
        }

        public WorkerPool(int threads)
            : base(threads) {
            InitPool();
        }

        public WorkerPool() {
            InitPool();
        }

        public IPromise<T> Invoke<T>(Func<T> task) {
            if (task == null)
                throw new ArgumentNullException("task");
            if (IsDisposed)
                throw new ObjectDisposedException(ToString());

            var promise = new FuncTask<T>(task, null, null, true);

            var lop = TraceContext.Instance.CurrentOperation;

            EnqueueTask(delegate {
                TraceContext.Instance.EnterLogicalOperation(lop, false);

                promise.Resolve();

                TraceContext.Instance.Leave();
            });

            return promise;
        }

        public IPromise Invoke(Action task) {
            if (task == null)
                throw new ArgumentNullException("task");
            if (IsDisposed)
                throw new ObjectDisposedException(ToString());

            var promise = new ActionTask(task, null, null, true);

            var lop = TraceContext.Instance.CurrentOperation;

            EnqueueTask(delegate {
                TraceContext.Instance.EnterLogicalOperation(lop, false);

                promise.Resolve();

                TraceContext.Instance.Leave();
            });

            return promise;
        }

        public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
            if (task == null)
                throw new ArgumentNullException("task");
            if (IsDisposed)
                throw new ObjectDisposedException(ToString());

            var promise = new Promise<T>();

            var lop = TraceContext.Instance.CurrentOperation;

            EnqueueTask(delegate {
                TraceContext.Instance.EnterLogicalOperation(lop, false);
                try {
                    if (!promise.CancelOperationIfRequested())
                        promise.Resolve(task(promise));
                } catch (Exception e) {
                    promise.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
            });

            return promise;
        }

        public IPromise Invoke<T>(Action<ICancellationToken> task) {
            if (task == null)
                throw new ArgumentNullException("task");
            if (IsDisposed)
                throw new ObjectDisposedException(ToString());

            var promise = new Promise();

            var lop = TraceContext.Instance.CurrentOperation;

            EnqueueTask(delegate {
                TraceContext.Instance.EnterLogicalOperation(lop, false);
                try {
                    if (!promise.CancelOperationIfRequested()) {
                        task(promise);
                        promise.Resolve();
                    }
                } catch (Exception e) {
                    promise.Reject(e);
                } finally {
                    TraceContext.Instance.Leave();
                }
            });

            return promise;
        }

        protected void EnqueueTask(Action unit) {
            Debug.Assert(unit != null);
            var len = Interlocked.Increment(ref m_queueLength);
            m_queue.Enqueue(unit);

            if (len > m_threshold * PoolSize) {
                StartWorker();
            }

            SignalThread();
        }

        protected override bool TryDequeue(out Action unit) {
            if (m_queue.TryDequeue(out unit)) {
                Interlocked.Decrement(ref m_queueLength);
                return true;
            }
            return false;
        }

        protected override void InvokeUnit(Action unit) {
            unit();
        }

    }
}