Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | fe33f4e02ad5 |
children | 2c5631b43c7d |
comparison
equal
deleted
inserted
replaced
79:05e6468f066f | 80:4f20870d0816 |
---|---|
10 public class WorkerPool : DispatchPool<Action> { | 10 public class WorkerPool : DispatchPool<Action> { |
11 | 11 |
12 MTQueue<Action> m_queue = new MTQueue<Action>(); | 12 MTQueue<Action> m_queue = new MTQueue<Action>(); |
13 int m_queueLength = 0; | 13 int m_queueLength = 0; |
14 readonly int m_threshold = 1; | 14 readonly int m_threshold = 1; |
15 int m_workers = 0; | |
15 | 16 |
16 public WorkerPool(int minThreads, int maxThreads, int threshold) | 17 public WorkerPool(int minThreads, int maxThreads, int threshold) |
17 : base(minThreads, maxThreads) { | 18 : base(minThreads, maxThreads) { |
18 m_threshold = threshold; | 19 m_threshold = threshold; |
20 m_workers = minThreads; | |
19 InitPool(); | 21 InitPool(); |
20 } | 22 } |
21 | 23 |
22 public WorkerPool(int minThreads, int maxThreads) : | 24 public WorkerPool(int minThreads, int maxThreads) : |
23 base(minThreads, maxThreads) { | 25 base(minThreads, maxThreads) { |
26 m_workers = minThreads; | |
24 InitPool(); | 27 InitPool(); |
25 } | 28 } |
26 | 29 |
27 public WorkerPool(int threads) | 30 public WorkerPool(int threads) |
28 : base(threads) { | 31 : base(threads) { |
32 m_workers = threads; | |
29 InitPool(); | 33 InitPool(); |
30 } | 34 } |
31 | 35 |
32 public WorkerPool() | 36 public WorkerPool() |
33 : base() { | 37 : base() { |
60 protected void EnqueueTask(Action unit) { | 64 protected void EnqueueTask(Action unit) { |
61 Debug.Assert(unit != null); | 65 Debug.Assert(unit != null); |
62 var len = Interlocked.Increment(ref m_queueLength); | 66 var len = Interlocked.Increment(ref m_queueLength); |
63 m_queue.Enqueue(unit); | 67 m_queue.Enqueue(unit); |
64 | 68 |
65 if (len > m_threshold*ActiveThreads) | 69 if (len > m_threshold * m_workers) { |
70 Interlocked.Increment(ref m_workers); | |
66 GrowPool(); | 71 GrowPool(); |
72 } | |
67 } | 73 } |
68 | 74 |
69 protected override bool TryDequeue(out Action unit) { | 75 protected override bool TryDequeue(out Action unit) { |
70 if (m_queue.TryDequeue(out unit)) { | 76 if (m_queue.TryDequeue(out unit)) { |
71 Interlocked.Decrement(ref m_queueLength); | 77 Interlocked.Decrement(ref m_queueLength); |
83 // GrowPool? == NO | 89 // GrowPool? == NO |
84 // ActiveThreads-- | 90 // ActiveThreads-- |
85 // Suspend | 91 // Suspend |
86 // queueLength > 0 | 92 // queueLength > 0 |
87 // continue | 93 // continue |
94 Thread.MemoryBarrier(); | |
88 if (m_queueLength > 0) | 95 if (m_queueLength > 0) |
89 return true; | 96 return true; |
97 Interlocked.Decrement(ref m_workers); | |
90 return base.Suspend(); | 98 return base.Suspend(); |
91 } | 99 } |
92 | 100 |
93 protected override void InvokeUnit(Action unit) { | 101 protected override void InvokeUnit(Action unit) { |
94 unit(); | 102 unit(); |