view Implab/Parallels/WorkerPool.cs @ 111:0fdaf280c797 v1

minor fix
author cin
date Wed, 19 Nov 2014 03:18:42 +0400
parents fe33f4e02ad5
children 4f20870d0816
line wrap: on
line source

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using Implab.Diagnostics;

namespace Implab.Parallels {
    public class WorkerPool : DispatchPool<Action> {

        MTQueue<Action> m_queue = new MTQueue<Action>();
        int m_queueLength = 0;
        readonly int m_threshold = 1;

        public WorkerPool(int minThreads, int maxThreads, int threshold)
            : base(minThreads, maxThreads) {
            m_threshold = threshold;
            InitPool();
        }

        public WorkerPool(int minThreads, int maxThreads) :
            base(minThreads, maxThreads) {
            InitPool();
        }

        public WorkerPool(int threads)
            : base(threads) {
            InitPool();
        }

        public WorkerPool()
            : base() {
            InitPool();
        }

        public Promise<T> Invoke<T>(Func<T> task) {
            if (task == null)
                throw new ArgumentNullException("task");
            if (IsDisposed)
                throw new ObjectDisposedException(ToString());

            var promise = new Promise<T>();

            var caller = TraceContext.Snapshot();

            EnqueueTask(delegate() {
                caller.Invoke(delegate() {
                    try {
                        promise.Resolve(task());
                    } catch (Exception e) {
                        promise.Reject(e);
                    }
                });
            });

            return promise;
        }

        protected void EnqueueTask(Action unit) {
            Debug.Assert(unit != null);
            var len = Interlocked.Increment(ref m_queueLength);
            m_queue.Enqueue(unit);

            if (len > m_threshold*ActiveThreads)
                GrowPool();
        }

        protected override bool TryDequeue(out Action unit) {
            if (m_queue.TryDequeue(out unit)) {
                Interlocked.Decrement(ref m_queueLength);
                return true;
            }
            return false;
        }

        protected override bool Suspend() {
            // This override solves race condition
            // WORKER                   CLIENT
            // ---------------------------------------
            // TryDeque == false
            //                          Enqueue(unit), queueLen++
            //                          GrowPool? == NO
            // ActiveThreads--
            // Suspend
            //    queueLength > 0
            // continue
            if (m_queueLength > 0)
                return true;
            return base.Suspend();
        }

        protected override void InvokeUnit(Action unit) {
            unit();
        }

    }
}