Mercurial > pub > ImplabNet
annotate Implab/Parallels/WorkerPool.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | 4c0e5ef99986 |
children | 471f596b2603 |
rev | line source |
---|---|
12 | 1 using System; |
2 using System.Threading; | |
3 using System.Diagnostics; | |
35 | 4 using Implab.Diagnostics; |
12 | 5 |
6 namespace Implab.Parallels { | |
15 | 7 public class WorkerPool : DispatchPool<Action> { |
12 | 8 |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
92
diff
changeset
|
9 AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); |
15 | 10 int m_queueLength = 0; |
16 | 11 readonly int m_threshold = 1; |
13 | 12 |
16 | 13 public WorkerPool(int minThreads, int maxThreads, int threshold) |
15 | 14 : base(minThreads, maxThreads) { |
16 | 15 m_threshold = threshold; |
16 InitPool(); | |
17 } | |
18 | |
19 public WorkerPool(int minThreads, int maxThreads) : | |
20 base(minThreads, maxThreads) { | |
21 InitPool(); | |
13 | 22 } |
23 | |
15 | 24 public WorkerPool(int threads) |
25 : base(threads) { | |
16 | 26 InitPool(); |
13 | 27 } |
28 | |
92 | 29 public WorkerPool() { |
16 | 30 InitPool(); |
13 | 31 } |
32 | |
12 | 33 public Promise<T> Invoke<T>(Func<T> task) { |
34 if (task == null) | |
35 throw new ArgumentNullException("task"); | |
15 | 36 if (IsDisposed) |
37 throw new ObjectDisposedException(ToString()); | |
12 | 38 |
39 var promise = new Promise<T>(); | |
40 | |
92 | 41 var lop = TraceContext.Instance.CurrentOperation; |
35 | 42 |
15 | 43 EnqueueTask(delegate() { |
92 | 44 TraceContext.Instance.EnterLogicalOperation(lop, false); |
45 try { | |
46 promise.Resolve(task()); | |
47 } catch (Exception e) { | |
48 promise.Reject(e); | |
49 } finally { | |
50 TraceContext.Instance.Leave(); | |
51 } | |
13 | 52 }); |
12 | 53 |
54 return promise; | |
55 } | |
56 | |
15 | 57 protected void EnqueueTask(Action unit) { |
58 Debug.Assert(unit != null); | |
16 | 59 var len = Interlocked.Increment(ref m_queueLength); |
15 | 60 m_queue.Enqueue(unit); |
16 | 61 |
81 | 62 if (len > m_threshold * PoolSize) { |
63 StartWorker(); | |
80 | 64 } |
81 | 65 |
66 SignalThread(); | |
12 | 67 } |
68 | |
15 | 69 protected override bool TryDequeue(out Action unit) { |
70 if (m_queue.TryDequeue(out unit)) { | |
71 Interlocked.Decrement(ref m_queueLength); | |
72 return true; | |
12 | 73 } |
15 | 74 return false; |
12 | 75 } |
76 | |
15 | 77 protected override void InvokeUnit(Action unit) { |
78 unit(); | |
12 | 79 } |
16 | 80 |
12 | 81 } |
82 } |