comparison Implab/Parallels/WorkerPool.cs @ 92:4c0e5ef99986 v2

rewritten tracing
author cin
date Wed, 22 Oct 2014 18:37:56 +0400
parents 2c5631b43c7d
children 2573b562e328
comparison
equal deleted inserted replaced
91:cdaaf4792c22 92:4c0e5ef99986
1 using System; 1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading; 2 using System.Threading;
6 using System.Diagnostics; 3 using System.Diagnostics;
7 using Implab.Diagnostics; 4 using Implab.Diagnostics;
8 5
9 namespace Implab.Parallels { 6 namespace Implab.Parallels {
10 public class WorkerPool : DispatchPool<Action> { 7 public class WorkerPool : DispatchPool<Action> {
11 8
12 MTQueue<Action> m_queue = new MTQueue<Action>(); 9 MTQueue<Action> m_queue = new MTQueue<Action>();
13 int m_queueLength = 0; 10 int m_queueLength = 0;
14 readonly int m_threshold = 1; 11 readonly int m_threshold = 1;
15 int m_workers = 0;
16 12
17 public WorkerPool(int minThreads, int maxThreads, int threshold) 13 public WorkerPool(int minThreads, int maxThreads, int threshold)
18 : base(minThreads, maxThreads) { 14 : base(minThreads, maxThreads) {
19 m_threshold = threshold; 15 m_threshold = threshold;
20 m_workers = minThreads;
21 InitPool(); 16 InitPool();
22 } 17 }
23 18
24 public WorkerPool(int minThreads, int maxThreads) : 19 public WorkerPool(int minThreads, int maxThreads) :
25 base(minThreads, maxThreads) { 20 base(minThreads, maxThreads) {
26 m_workers = minThreads;
27 InitPool(); 21 InitPool();
28 } 22 }
29 23
30 public WorkerPool(int threads) 24 public WorkerPool(int threads)
31 : base(threads) { 25 : base(threads) {
32 m_workers = threads;
33 InitPool(); 26 InitPool();
34 } 27 }
35 28
36 public WorkerPool() 29 public WorkerPool() {
37 : base() {
38 InitPool(); 30 InitPool();
39 } 31 }
40 32
41 public Promise<T> Invoke<T>(Func<T> task) { 33 public Promise<T> Invoke<T>(Func<T> task) {
42 if (task == null) 34 if (task == null)
44 if (IsDisposed) 36 if (IsDisposed)
45 throw new ObjectDisposedException(ToString()); 37 throw new ObjectDisposedException(ToString());
46 38
47 var promise = new Promise<T>(); 39 var promise = new Promise<T>();
48 40
49 var caller = TraceContext.Snapshot(); 41 var lop = TraceContext.Instance.CurrentOperation;
50 42
51 EnqueueTask(delegate() { 43 EnqueueTask(delegate() {
52 caller.Invoke(delegate() { 44 TraceContext.Instance.EnterLogicalOperation(lop, false);
53 try { 45 try {
54 promise.Resolve(task()); 46 promise.Resolve(task());
55 } catch (Exception e) { 47 } catch (Exception e) {
56 promise.Reject(e); 48 promise.Reject(e);
57 } 49 } finally {
58 }); 50 TraceContext.Instance.Leave();
51 }
59 }); 52 });
60 53
61 return promise; 54 return promise;
62 } 55 }
63 56