comparison Implab/Parallels/WorkerPool.cs @ 80:4f20870d0816 v2

added memory barriers
author cin
date Fri, 26 Sep 2014 03:32:34 +0400
parents fe33f4e02ad5
children 2c5631b43c7d
comparison
equal deleted inserted replaced
79:05e6468f066f 80:4f20870d0816
10 public class WorkerPool : DispatchPool<Action> { 10 public class WorkerPool : DispatchPool<Action> {
11 11
12 MTQueue<Action> m_queue = new MTQueue<Action>(); 12 MTQueue<Action> m_queue = new MTQueue<Action>();
13 int m_queueLength = 0; 13 int m_queueLength = 0;
14 readonly int m_threshold = 1; 14 readonly int m_threshold = 1;
15 int m_workers = 0;
15 16
16 public WorkerPool(int minThreads, int maxThreads, int threshold) 17 public WorkerPool(int minThreads, int maxThreads, int threshold)
17 : base(minThreads, maxThreads) { 18 : base(minThreads, maxThreads) {
18 m_threshold = threshold; 19 m_threshold = threshold;
20 m_workers = minThreads;
19 InitPool(); 21 InitPool();
20 } 22 }
21 23
22 public WorkerPool(int minThreads, int maxThreads) : 24 public WorkerPool(int minThreads, int maxThreads) :
23 base(minThreads, maxThreads) { 25 base(minThreads, maxThreads) {
26 m_workers = minThreads;
24 InitPool(); 27 InitPool();
25 } 28 }
26 29
27 public WorkerPool(int threads) 30 public WorkerPool(int threads)
28 : base(threads) { 31 : base(threads) {
32 m_workers = threads;
29 InitPool(); 33 InitPool();
30 } 34 }
31 35
32 public WorkerPool() 36 public WorkerPool()
33 : base() { 37 : base() {
60 protected void EnqueueTask(Action unit) { 64 protected void EnqueueTask(Action unit) {
61 Debug.Assert(unit != null); 65 Debug.Assert(unit != null);
62 var len = Interlocked.Increment(ref m_queueLength); 66 var len = Interlocked.Increment(ref m_queueLength);
63 m_queue.Enqueue(unit); 67 m_queue.Enqueue(unit);
64 68
65 if (len > m_threshold*ActiveThreads) 69 if (len > m_threshold * m_workers) {
70 Interlocked.Increment(ref m_workers);
66 GrowPool(); 71 GrowPool();
72 }
67 } 73 }
68 74
69 protected override bool TryDequeue(out Action unit) { 75 protected override bool TryDequeue(out Action unit) {
70 if (m_queue.TryDequeue(out unit)) { 76 if (m_queue.TryDequeue(out unit)) {
71 Interlocked.Decrement(ref m_queueLength); 77 Interlocked.Decrement(ref m_queueLength);
83 // GrowPool? == NO 89 // GrowPool? == NO
84 // ActiveThreads-- 90 // ActiveThreads--
85 // Suspend 91 // Suspend
86 // queueLength > 0 92 // queueLength > 0
87 // continue 93 // continue
94 Thread.MemoryBarrier();
88 if (m_queueLength > 0) 95 if (m_queueLength > 0)
89 return true; 96 return true;
97 Interlocked.Decrement(ref m_workers);
90 return base.Suspend(); 98 return base.Suspend();
91 } 99 }
92 100
93 protected override void InvokeUnit(Action unit) { 101 protected override void InvokeUnit(Action unit) {
94 unit(); 102 unit();