Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 192:f1da3afc3521 release v2.1
Слияние с v2
author | cin |
---|---|
date | Fri, 22 Apr 2016 13:10:34 +0300 |
parents | eb793fbbe4ea |
children |
comparison
equal
deleted
inserted
replaced
71:1714fd8678ef | 192:f1da3afc3521 |
---|---|
1 using System; | 1 using System; |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | 2 using System.Threading; |
6 using System.Diagnostics; | 3 using System.Diagnostics; |
7 using Implab.Diagnostics; | 4 using Implab.Diagnostics; |
8 | 5 |
9 namespace Implab.Parallels { | 6 namespace Implab.Parallels { |
10 public class WorkerPool : DispatchPool<Action> { | 7 public class WorkerPool : DispatchPool<Action> { |
11 | 8 |
12 MTQueue<Action> m_queue = new MTQueue<Action>(); | 9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); |
13 int m_queueLength = 0; | 10 int m_queueLength; |
14 readonly int m_threshold = 1; | 11 readonly int m_threshold = 1; |
15 | 12 |
16 public WorkerPool(int minThreads, int maxThreads, int threshold) | 13 public WorkerPool(int minThreads, int maxThreads, int threshold) |
17 : base(minThreads, maxThreads) { | 14 : base(minThreads, maxThreads) { |
18 m_threshold = threshold; | 15 m_threshold = threshold; |
27 public WorkerPool(int threads) | 24 public WorkerPool(int threads) |
28 : base(threads) { | 25 : base(threads) { |
29 InitPool(); | 26 InitPool(); |
30 } | 27 } |
31 | 28 |
32 public WorkerPool() | 29 public WorkerPool() { |
33 : base() { | |
34 InitPool(); | 30 InitPool(); |
35 } | 31 } |
36 | 32 |
37 public Promise<T> Invoke<T>(Func<T> task) { | 33 public IPromise<T> Invoke<T>(Func<T> task) { |
34 if (task == null) | |
35 throw new ArgumentNullException("task"); | |
36 if (IsDisposed) | |
37 throw new ObjectDisposedException(ToString()); | |
38 | |
39 var promise = new FuncTask<T>(task, null, null, true); | |
40 | |
41 var lop = TraceContext.Instance.CurrentOperation; | |
42 | |
43 EnqueueTask(delegate { | |
44 TraceContext.Instance.EnterLogicalOperation(lop, false); | |
45 | |
46 promise.Resolve(); | |
47 | |
48 TraceContext.Instance.Leave(); | |
49 }); | |
50 | |
51 return promise; | |
52 } | |
53 | |
54 public IPromise Invoke(Action task) { | |
55 if (task == null) | |
56 throw new ArgumentNullException("task"); | |
57 if (IsDisposed) | |
58 throw new ObjectDisposedException(ToString()); | |
59 | |
60 var promise = new ActionTask(task, null, null, true); | |
61 | |
62 var lop = TraceContext.Instance.CurrentOperation; | |
63 | |
64 EnqueueTask(delegate { | |
65 TraceContext.Instance.EnterLogicalOperation(lop, false); | |
66 | |
67 promise.Resolve(); | |
68 | |
69 TraceContext.Instance.Leave(); | |
70 }); | |
71 | |
72 return promise; | |
73 } | |
74 | |
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) { | |
38 if (task == null) | 76 if (task == null) |
39 throw new ArgumentNullException("task"); | 77 throw new ArgumentNullException("task"); |
40 if (IsDisposed) | 78 if (IsDisposed) |
41 throw new ObjectDisposedException(ToString()); | 79 throw new ObjectDisposedException(ToString()); |
42 | 80 |
43 var promise = new Promise<T>(); | 81 var promise = new Promise<T>(); |
44 | 82 |
45 var caller = TraceContext.Snapshot(); | 83 var lop = TraceContext.Instance.CurrentOperation; |
46 | 84 |
47 EnqueueTask(delegate() { | 85 EnqueueTask(delegate { |
48 caller.Invoke(delegate() { | 86 TraceContext.Instance.EnterLogicalOperation(lop, false); |
49 try { | 87 try { |
50 promise.Resolve(task()); | 88 if (!promise.CancelOperationIfRequested()) |
51 } catch (Exception e) { | 89 promise.Resolve(task(promise)); |
52 promise.Reject(e); | 90 } catch (Exception e) { |
91 promise.Reject(e); | |
92 } finally { | |
93 TraceContext.Instance.Leave(); | |
94 } | |
95 }); | |
96 | |
97 return promise; | |
98 } | |
99 | |
100 public IPromise Invoke<T>(Action<ICancellationToken> task) { | |
101 if (task == null) | |
102 throw new ArgumentNullException("task"); | |
103 if (IsDisposed) | |
104 throw new ObjectDisposedException(ToString()); | |
105 | |
106 var promise = new Promise(); | |
107 | |
108 var lop = TraceContext.Instance.CurrentOperation; | |
109 | |
110 EnqueueTask(delegate { | |
111 TraceContext.Instance.EnterLogicalOperation(lop, false); | |
112 try { | |
113 if (!promise.CancelOperationIfRequested()) { | |
114 task(promise); | |
115 promise.Resolve(); | |
53 } | 116 } |
54 }); | 117 } catch (Exception e) { |
118 promise.Reject(e); | |
119 } finally { | |
120 TraceContext.Instance.Leave(); | |
121 } | |
55 }); | 122 }); |
56 | 123 |
57 return promise; | 124 return promise; |
58 } | 125 } |
59 | 126 |
60 protected void EnqueueTask(Action unit) { | 127 protected void EnqueueTask(Action unit) { |
61 Debug.Assert(unit != null); | 128 Debug.Assert(unit != null); |
62 var len = Interlocked.Increment(ref m_queueLength); | 129 var len = Interlocked.Increment(ref m_queueLength); |
63 m_queue.Enqueue(unit); | 130 m_queue.Enqueue(unit); |
64 | 131 |
65 if (len > m_threshold*ActiveThreads) | 132 if (len > m_threshold * PoolSize) { |
66 GrowPool(); | 133 StartWorker(); |
134 } | |
135 | |
136 SignalThread(); | |
67 } | 137 } |
68 | 138 |
69 protected override bool TryDequeue(out Action unit) { | 139 protected override bool TryDequeue(out Action unit) { |
70 if (m_queue.TryDequeue(out unit)) { | 140 if (m_queue.TryDequeue(out unit)) { |
71 Interlocked.Decrement(ref m_queueLength); | 141 Interlocked.Decrement(ref m_queueLength); |
72 return true; | 142 return true; |
73 } | 143 } |
74 return false; | 144 return false; |
75 } | 145 } |
76 | 146 |
77 protected override bool Suspend() { | |
78 // This override solves race condition | |
79 // WORKER CLIENT | |
80 // --------------------------------------- | |
81 // TryDeque == false | |
82 // Enqueue(unit), queueLen++ | |
83 // GrowPool? == NO | |
84 // ActiveThreads-- | |
85 // Suspend | |
86 // queueLength > 0 | |
87 // continue | |
88 if (m_queueLength > 0) | |
89 return true; | |
90 return base.Suspend(); | |
91 } | |
92 | |
93 protected override void InvokeUnit(Action unit) { | 147 protected override void InvokeUnit(Action unit) { |
94 unit(); | 148 unit(); |
95 } | 149 } |
96 | 150 |
97 } | 151 } |