diff Implab/Parallels/WorkerPool.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents eb793fbbe4ea
children
line wrap: on
line diff
--- a/Implab/Parallels/WorkerPool.cs	Wed Sep 03 18:34:02 2014 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Fri Apr 22 13:10:34 2016 +0300
@@ -1,7 +1,4 @@
 using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
 using System.Threading;
 using System.Diagnostics;
 using Implab.Diagnostics;
@@ -9,8 +6,8 @@
 namespace Implab.Parallels {
     public class WorkerPool : DispatchPool<Action> {
 
-        MTQueue<Action> m_queue = new MTQueue<Action>();
-        int m_queueLength = 0;
+        AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
+        int m_queueLength;
         readonly int m_threshold = 1;
 
         public WorkerPool(int minThreads, int maxThreads, int threshold)
@@ -29,12 +26,53 @@
             InitPool();
         }
 
-        public WorkerPool()
-            : base() {
+        public WorkerPool() {
             InitPool();
         }
 
-        public Promise<T> Invoke<T>(Func<T> task) {
+        public IPromise<T> Invoke<T>(Func<T> task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new FuncTask<T>(task, null, null, true);
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+
+                promise.Resolve();
+
+                TraceContext.Instance.Leave();
+            });
+
+            return promise;
+        }
+
+        public IPromise Invoke(Action task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new ActionTask(task, null, null, true);
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+
+                promise.Resolve();
+
+                TraceContext.Instance.Leave();
+            });
+
+            return promise;
+        }
+
+        public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
             if (task == null)
                 throw new ArgumentNullException("task");
             if (IsDisposed)
@@ -42,16 +80,45 @@
 
             var promise = new Promise<T>();
 
-            var caller = TraceContext.Snapshot();
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+                try {
+                    if (!promise.CancelOperationIfRequested())
+                        promise.Resolve(task(promise));
+                } catch (Exception e) {
+                    promise.Reject(e);
+                } finally {
+                    TraceContext.Instance.Leave();
+                }
+            });
+
+            return promise;
+        }
 
-            EnqueueTask(delegate() {
-                caller.Invoke(delegate() {
-                    try {
-                        promise.Resolve(task());
-                    } catch (Exception e) {
-                        promise.Reject(e);
+        public IPromise Invoke<T>(Action<ICancellationToken> task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new Promise();
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+                try {
+                    if (!promise.CancelOperationIfRequested()) {
+                        task(promise);
+                        promise.Resolve();
                     }
-                });
+                } catch (Exception e) {
+                    promise.Reject(e);
+                } finally {
+                    TraceContext.Instance.Leave();
+                }
             });
 
             return promise;
@@ -62,8 +129,11 @@
             var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
 
-            if (len > m_threshold*ActiveThreads)
-                GrowPool();
+            if (len > m_threshold * PoolSize) {
+                StartWorker();
+            }
+
+            SignalThread();
         }
 
         protected override bool TryDequeue(out Action unit) {
@@ -74,22 +144,6 @@
             return false;
         }
 
-        protected override bool Suspend() {
-            // This override solves race condition
-            // WORKER                   CLIENT
-            // ---------------------------------------
-            // TryDeque == false
-            //                          Enqueue(unit), queueLen++
-            //                          GrowPool? == NO
-            // ActiveThreads--
-            // Suspend
-            //    queueLength > 0
-            // continue
-            if (m_queueLength > 0)
-                return true;
-            return base.Suspend();
-        }
-
         protected override void InvokeUnit(Action unit) {
             unit();
         }