Mercurial > pub > ImplabNet
annotate Implab/Parallels/WorkerPool.cs @ 34:dabf79fde388
fixed race condition in DispatchPool
author | cin |
---|---|
date | Thu, 10 Apr 2014 04:20:25 +0400 |
parents | 6a56df4ec59e |
children | 2880242f987a |
rev | line source |
---|---|
12 | 1 using System; |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
15 | 9 public class WorkerPool : DispatchPool<Action> { |
12 | 10 |
15 | 11 MTQueue<Action> m_queue = new MTQueue<Action>(); |
12 int m_queueLength = 0; | |
16 | 13 readonly int m_threshold = 1; |
13 | 14 |
16 | 15 public WorkerPool(int minThreads, int maxThreads, int threshold) |
15 | 16 : base(minThreads, maxThreads) { |
16 | 17 m_threshold = threshold; |
18 InitPool(); | |
19 } | |
20 | |
21 public WorkerPool(int minThreads, int maxThreads) : | |
22 base(minThreads, maxThreads) { | |
23 InitPool(); | |
13 | 24 } |
25 | |
15 | 26 public WorkerPool(int threads) |
27 : base(threads) { | |
16 | 28 InitPool(); |
13 | 29 } |
30 | |
15 | 31 public WorkerPool() |
32 : base() { | |
16 | 33 InitPool(); |
13 | 34 } |
35 | |
12 | 36 public Promise<T> Invoke<T>(Func<T> task) { |
37 if (task == null) | |
38 throw new ArgumentNullException("task"); | |
15 | 39 if (IsDisposed) |
40 throw new ObjectDisposedException(ToString()); | |
12 | 41 |
42 var promise = new Promise<T>(); | |
43 | |
15 | 44 EnqueueTask(delegate() { |
13 | 45 try { |
46 promise.Resolve(task()); | |
47 } catch (Exception e) { | |
48 promise.Reject(e); | |
49 } | |
50 }); | |
12 | 51 |
52 return promise; | |
53 } | |
54 | |
15 | 55 protected void EnqueueTask(Action unit) { |
56 Debug.Assert(unit != null); | |
16 | 57 var len = Interlocked.Increment(ref m_queueLength); |
15 | 58 m_queue.Enqueue(unit); |
16 | 59 |
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
60 if (len > m_threshold*ActiveThreads) |
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
61 GrowPool(); |
12 | 62 } |
63 | |
15 | 64 protected override bool TryDequeue(out Action unit) { |
65 if (m_queue.TryDequeue(out unit)) { | |
66 Interlocked.Decrement(ref m_queueLength); | |
67 return true; | |
12 | 68 } |
15 | 69 return false; |
12 | 70 } |
71 | |
34 | 72 protected override bool Suspend() { |
73 // This override solves race condition | |
74 // WORKER CLIENT | |
75 // --------------------------------------- | |
76 // TryDeque == false | |
77 // Enqueue(unit), queueLen++ | |
78 // GrowPool? == NO | |
79 // ActiveThreads-- | |
80 // Suspend | |
81 // queueLength > 0 | |
82 // continue | |
83 if (m_queueLength > 0) | |
84 return true; | |
85 return base.Suspend(); | |
86 } | |
87 | |
15 | 88 protected override void InvokeUnit(Action unit) { |
89 unit(); | |
12 | 90 } |
16 | 91 |
12 | 92 } |
93 } |