comparison Implab/Parallels/WorkerPool.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents eb793fbbe4ea
children
comparison
equal deleted inserted replaced
71:1714fd8678ef 192:f1da3afc3521
1 using System; 1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading; 2 using System.Threading;
6 using System.Diagnostics; 3 using System.Diagnostics;
7 using Implab.Diagnostics; 4 using Implab.Diagnostics;
8 5
9 namespace Implab.Parallels { 6 namespace Implab.Parallels {
10 public class WorkerPool : DispatchPool<Action> { 7 public class WorkerPool : DispatchPool<Action> {
11 8
12 MTQueue<Action> m_queue = new MTQueue<Action>(); 9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>();
13 int m_queueLength = 0; 10 int m_queueLength;
14 readonly int m_threshold = 1; 11 readonly int m_threshold = 1;
15 12
16 public WorkerPool(int minThreads, int maxThreads, int threshold) 13 public WorkerPool(int minThreads, int maxThreads, int threshold)
17 : base(minThreads, maxThreads) { 14 : base(minThreads, maxThreads) {
18 m_threshold = threshold; 15 m_threshold = threshold;
27 public WorkerPool(int threads) 24 public WorkerPool(int threads)
28 : base(threads) { 25 : base(threads) {
29 InitPool(); 26 InitPool();
30 } 27 }
31 28
32 public WorkerPool() 29 public WorkerPool() {
33 : base() {
34 InitPool(); 30 InitPool();
35 } 31 }
36 32
37 public Promise<T> Invoke<T>(Func<T> task) { 33 public IPromise<T> Invoke<T>(Func<T> task) {
34 if (task == null)
35 throw new ArgumentNullException("task");
36 if (IsDisposed)
37 throw new ObjectDisposedException(ToString());
38
39 var promise = new FuncTask<T>(task, null, null, true);
40
41 var lop = TraceContext.Instance.CurrentOperation;
42
43 EnqueueTask(delegate {
44 TraceContext.Instance.EnterLogicalOperation(lop, false);
45
46 promise.Resolve();
47
48 TraceContext.Instance.Leave();
49 });
50
51 return promise;
52 }
53
54 public IPromise Invoke(Action task) {
55 if (task == null)
56 throw new ArgumentNullException("task");
57 if (IsDisposed)
58 throw new ObjectDisposedException(ToString());
59
60 var promise = new ActionTask(task, null, null, true);
61
62 var lop = TraceContext.Instance.CurrentOperation;
63
64 EnqueueTask(delegate {
65 TraceContext.Instance.EnterLogicalOperation(lop, false);
66
67 promise.Resolve();
68
69 TraceContext.Instance.Leave();
70 });
71
72 return promise;
73 }
74
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
38 if (task == null) 76 if (task == null)
39 throw new ArgumentNullException("task"); 77 throw new ArgumentNullException("task");
40 if (IsDisposed) 78 if (IsDisposed)
41 throw new ObjectDisposedException(ToString()); 79 throw new ObjectDisposedException(ToString());
42 80
43 var promise = new Promise<T>(); 81 var promise = new Promise<T>();
44 82
45 var caller = TraceContext.Snapshot(); 83 var lop = TraceContext.Instance.CurrentOperation;
46 84
47 EnqueueTask(delegate() { 85 EnqueueTask(delegate {
48 caller.Invoke(delegate() { 86 TraceContext.Instance.EnterLogicalOperation(lop, false);
49 try { 87 try {
50 promise.Resolve(task()); 88 if (!promise.CancelOperationIfRequested())
51 } catch (Exception e) { 89 promise.Resolve(task(promise));
52 promise.Reject(e); 90 } catch (Exception e) {
91 promise.Reject(e);
92 } finally {
93 TraceContext.Instance.Leave();
94 }
95 });
96
97 return promise;
98 }
99
100 public IPromise Invoke<T>(Action<ICancellationToken> task) {
101 if (task == null)
102 throw new ArgumentNullException("task");
103 if (IsDisposed)
104 throw new ObjectDisposedException(ToString());
105
106 var promise = new Promise();
107
108 var lop = TraceContext.Instance.CurrentOperation;
109
110 EnqueueTask(delegate {
111 TraceContext.Instance.EnterLogicalOperation(lop, false);
112 try {
113 if (!promise.CancelOperationIfRequested()) {
114 task(promise);
115 promise.Resolve();
53 } 116 }
54 }); 117 } catch (Exception e) {
118 promise.Reject(e);
119 } finally {
120 TraceContext.Instance.Leave();
121 }
55 }); 122 });
56 123
57 return promise; 124 return promise;
58 } 125 }
59 126
60 protected void EnqueueTask(Action unit) { 127 protected void EnqueueTask(Action unit) {
61 Debug.Assert(unit != null); 128 Debug.Assert(unit != null);
62 var len = Interlocked.Increment(ref m_queueLength); 129 var len = Interlocked.Increment(ref m_queueLength);
63 m_queue.Enqueue(unit); 130 m_queue.Enqueue(unit);
64 131
65 if (len > m_threshold*ActiveThreads) 132 if (len > m_threshold * PoolSize) {
66 GrowPool(); 133 StartWorker();
134 }
135
136 SignalThread();
67 } 137 }
68 138
69 protected override bool TryDequeue(out Action unit) { 139 protected override bool TryDequeue(out Action unit) {
70 if (m_queue.TryDequeue(out unit)) { 140 if (m_queue.TryDequeue(out unit)) {
71 Interlocked.Decrement(ref m_queueLength); 141 Interlocked.Decrement(ref m_queueLength);
72 return true; 142 return true;
73 } 143 }
74 return false; 144 return false;
75 } 145 }
76 146
77 protected override bool Suspend() {
78 // This override solves race condition
79 // WORKER CLIENT
80 // ---------------------------------------
81 // TryDeque == false
82 // Enqueue(unit), queueLen++
83 // GrowPool? == NO
84 // ActiveThreads--
85 // Suspend
86 // queueLength > 0
87 // continue
88 if (m_queueLength > 0)
89 return true;
90 return base.Suspend();
91 }
92
93 protected override void InvokeUnit(Action unit) { 147 protected override void InvokeUnit(Action unit) {
94 unit(); 148 unit();
95 } 149 }
96 150
97 } 151 }