Mercurial > pub > ImplabNet
annotate Implab/Parallels/WorkerPool.cs @ 111:0fdaf280c797 v1
minor fix
author | cin |
---|---|
date | Wed, 19 Nov 2014 03:18:42 +0400 |
parents | fe33f4e02ad5 |
children | 4f20870d0816 |
rev | line source |
---|---|
12 | 1 using System; |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
35 | 7 using Implab.Diagnostics; |
12 | 8 |
9 namespace Implab.Parallels { | |
15 | 10 public class WorkerPool : DispatchPool<Action> { |
12 | 11 |
15 | 12 MTQueue<Action> m_queue = new MTQueue<Action>(); |
13 int m_queueLength = 0; | |
16 | 14 readonly int m_threshold = 1; |
13 | 15 |
16 | 16 public WorkerPool(int minThreads, int maxThreads, int threshold) |
15 | 17 : base(minThreads, maxThreads) { |
16 | 18 m_threshold = threshold; |
19 InitPool(); | |
20 } | |
21 | |
22 public WorkerPool(int minThreads, int maxThreads) : | |
23 base(minThreads, maxThreads) { | |
24 InitPool(); | |
13 | 25 } |
26 | |
15 | 27 public WorkerPool(int threads) |
28 : base(threads) { | |
16 | 29 InitPool(); |
13 | 30 } |
31 | |
15 | 32 public WorkerPool() |
33 : base() { | |
16 | 34 InitPool(); |
13 | 35 } |
36 | |
12 | 37 public Promise<T> Invoke<T>(Func<T> task) { |
38 if (task == null) | |
39 throw new ArgumentNullException("task"); | |
15 | 40 if (IsDisposed) |
41 throw new ObjectDisposedException(ToString()); | |
12 | 42 |
43 var promise = new Promise<T>(); | |
44 | |
40 | 45 var caller = TraceContext.Snapshot(); |
35 | 46 |
15 | 47 EnqueueTask(delegate() { |
40 | 48 caller.Invoke(delegate() { |
49 try { | |
50 promise.Resolve(task()); | |
51 } catch (Exception e) { | |
52 promise.Reject(e); | |
53 } | |
54 }); | |
13 | 55 }); |
12 | 56 |
57 return promise; | |
58 } | |
59 | |
15 | 60 protected void EnqueueTask(Action unit) { |
61 Debug.Assert(unit != null); | |
16 | 62 var len = Interlocked.Increment(ref m_queueLength); |
15 | 63 m_queue.Enqueue(unit); |
16 | 64 |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
65 if (len > m_threshold*ActiveThreads) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
66 GrowPool(); |
12 | 67 } |
68 | |
15 | 69 protected override bool TryDequeue(out Action unit) { |
70 if (m_queue.TryDequeue(out unit)) { | |
71 Interlocked.Decrement(ref m_queueLength); | |
72 return true; | |
12 | 73 } |
15 | 74 return false; |
12 | 75 } |
76 | |
34 | 77 protected override bool Suspend() { |
78 // This override solves race condition | |
79 // WORKER CLIENT | |
80 // --------------------------------------- | |
81 // TryDeque == false | |
82 // Enqueue(unit), queueLen++ | |
83 // GrowPool? == NO | |
84 // ActiveThreads-- | |
85 // Suspend | |
86 // queueLength > 0 | |
87 // continue | |
88 if (m_queueLength > 0) | |
89 return true; | |
90 return base.Suspend(); | |
91 } | |
92 | |
15 | 93 protected override void InvokeUnit(Action unit) { |
94 unit(); | |
12 | 95 } |
16 | 96 |
12 | 97 } |
98 } |