Mercurial > pub > ImplabNet
annotate Implab/Parallels/WorkerPool.cs @ 205:8200ab154c8a v2
Added ResetState to RunnableComponent to reset in case of failure
Added StateChanged event to IRunnable
Renamed Promise.SUCCESS -> Promise.Success
Added Promise.FromException
Renamed Bundle -> PromiseAll in PromiseExtensions
author | cin |
---|---|
date | Tue, 25 Oct 2016 17:40:33 +0300 |
parents | eb793fbbe4ea |
children |
rev | line source |
---|---|
12 | 1 using System; |
2 using System.Threading; | |
3 using System.Diagnostics; | |
35 | 4 using Implab.Diagnostics; |
12 | 5 |
6 namespace Implab.Parallels { | |
15 | 7 public class WorkerPool : DispatchPool<Action> { |
12 | 8 |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
92
diff
changeset
|
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); |
129 | 10 int m_queueLength; |
16 | 11 readonly int m_threshold = 1; |
13 | 12 |
16 | 13 public WorkerPool(int minThreads, int maxThreads, int threshold) |
15 | 14 : base(minThreads, maxThreads) { |
16 | 15 m_threshold = threshold; |
16 InitPool(); | |
17 } | |
18 | |
19 public WorkerPool(int minThreads, int maxThreads) : | |
20 base(minThreads, maxThreads) { | |
21 InitPool(); | |
13 | 22 } |
23 | |
15 | 24 public WorkerPool(int threads) |
25 : base(threads) { | |
16 | 26 InitPool(); |
13 | 27 } |
28 | |
92 | 29 public WorkerPool() { |
16 | 30 InitPool(); |
13 | 31 } |
32 | |
149 | 33 public IPromise<T> Invoke<T>(Func<T> task) { |
34 if (task == null) | |
35 throw new ArgumentNullException("task"); | |
36 if (IsDisposed) | |
37 throw new ObjectDisposedException(ToString()); | |
38 | |
39 var promise = new FuncTask<T>(task, null, null, true); | |
40 | |
41 var lop = TraceContext.Instance.CurrentOperation; | |
42 | |
43 EnqueueTask(delegate { | |
44 TraceContext.Instance.EnterLogicalOperation(lop, false); | |
45 | |
46 promise.Resolve(); | |
47 | |
48 TraceContext.Instance.Leave(); | |
49 }); | |
50 | |
51 return promise; | |
52 } | |
53 | |
54 public IPromise Invoke(Action task) { | |
55 if (task == null) | |
56 throw new ArgumentNullException("task"); | |
57 if (IsDisposed) | |
58 throw new ObjectDisposedException(ToString()); | |
59 | |
60 var promise = new ActionTask(task, null, null, true); | |
61 | |
62 var lop = TraceContext.Instance.CurrentOperation; | |
63 | |
64 EnqueueTask(delegate { | |
65 TraceContext.Instance.EnterLogicalOperation(lop, false); | |
66 | |
67 promise.Resolve(); | |
68 | |
69 TraceContext.Instance.Leave(); | |
70 }); | |
71 | |
72 return promise; | |
73 } | |
74 | |
75 public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) { | |
12 | 76 if (task == null) |
77 throw new ArgumentNullException("task"); | |
15 | 78 if (IsDisposed) |
79 throw new ObjectDisposedException(ToString()); | |
12 | 80 |
81 var promise = new Promise<T>(); | |
82 | |
92 | 83 var lop = TraceContext.Instance.CurrentOperation; |
35 | 84 |
129 | 85 EnqueueTask(delegate { |
92 | 86 TraceContext.Instance.EnterLogicalOperation(lop, false); |
87 try { | |
149 | 88 if (!promise.CancelOperationIfRequested()) |
89 promise.Resolve(task(promise)); | |
90 } catch (Exception e) { | |
91 promise.Reject(e); | |
92 } finally { | |
93 TraceContext.Instance.Leave(); | |
94 } | |
95 }); | |
96 | |
97 return promise; | |
98 } | |
99 | |
100 public IPromise Invoke<T>(Action<ICancellationToken> task) { | |
101 if (task == null) | |
102 throw new ArgumentNullException("task"); | |
103 if (IsDisposed) | |
104 throw new ObjectDisposedException(ToString()); | |
105 | |
106 var promise = new Promise(); | |
107 | |
108 var lop = TraceContext.Instance.CurrentOperation; | |
109 | |
110 EnqueueTask(delegate { | |
111 TraceContext.Instance.EnterLogicalOperation(lop, false); | |
112 try { | |
113 if (!promise.CancelOperationIfRequested()) { | |
114 task(promise); | |
115 promise.Resolve(); | |
116 } | |
92 | 117 } catch (Exception e) { |
118 promise.Reject(e); | |
119 } finally { | |
120 TraceContext.Instance.Leave(); | |
121 } | |
13 | 122 }); |
12 | 123 |
124 return promise; | |
125 } | |
126 | |
15 | 127 protected void EnqueueTask(Action unit) { |
128 Debug.Assert(unit != null); | |
16 | 129 var len = Interlocked.Increment(ref m_queueLength); |
15 | 130 m_queue.Enqueue(unit); |
16 | 131 |
81 | 132 if (len > m_threshold * PoolSize) { |
133 StartWorker(); | |
80 | 134 } |
81 | 135 |
136 SignalThread(); | |
12 | 137 } |
138 | |
15 | 139 protected override bool TryDequeue(out Action unit) { |
140 if (m_queue.TryDequeue(out unit)) { | |
141 Interlocked.Decrement(ref m_queueLength); | |
142 return true; | |
12 | 143 } |
15 | 144 return false; |
12 | 145 } |
146 | |
15 | 147 protected override void InvokeUnit(Action unit) { |
148 unit(); | |
12 | 149 } |
16 | 150 |
12 | 151 } |
152 } |