comparison Implab/Parallels/WorkerPool.cs @ 81:2c5631b43c7d v2

dispatch pool rewritten
author cin
date Fri, 26 Sep 2014 20:44:01 +0400
parents 4f20870d0816
children 4c0e5ef99986
comparison
equal deleted inserted replaced
80:4f20870d0816 81:2c5631b43c7d
64 protected void EnqueueTask(Action unit) { 64 protected void EnqueueTask(Action unit) {
65 Debug.Assert(unit != null); 65 Debug.Assert(unit != null);
66 var len = Interlocked.Increment(ref m_queueLength); 66 var len = Interlocked.Increment(ref m_queueLength);
67 m_queue.Enqueue(unit); 67 m_queue.Enqueue(unit);
68 68
69 if (len > m_threshold * m_workers) { 69 if (len > m_threshold * PoolSize) {
70 Interlocked.Increment(ref m_workers); 70 StartWorker();
71 GrowPool();
72 } 71 }
72
73 SignalThread();
73 } 74 }
74 75
75 protected override bool TryDequeue(out Action unit) { 76 protected override bool TryDequeue(out Action unit) {
76 if (m_queue.TryDequeue(out unit)) { 77 if (m_queue.TryDequeue(out unit)) {
77 Interlocked.Decrement(ref m_queueLength); 78 Interlocked.Decrement(ref m_queueLength);
78 return true; 79 return true;
79 } 80 }
80 return false; 81 return false;
81 } 82 }
82 83
83 protected override bool Suspend() {
84 // This override solves race condition
85 // WORKER CLIENT
86 // ---------------------------------------
87 // TryDeque == false
88 // Enqueue(unit), queueLen++
89 // GrowPool? == NO
90 // ActiveThreads--
91 // Suspend
92 // queueLength > 0
93 // continue
94 Thread.MemoryBarrier();
95 if (m_queueLength > 0)
96 return true;
97 Interlocked.Decrement(ref m_workers);
98 return base.Suspend();
99 }
100
101 protected override void InvokeUnit(Action unit) { 84 protected override void InvokeUnit(Action unit) {
102 unit(); 85 unit();
103 } 86 }
104 87
105 } 88 }