annotate Implab/Parallels/WorkerPool.cs @ 80:4f20870d0816 v2

added memory barriers
author cin
date Fri, 26 Sep 2014 03:32:34 +0400
parents fe33f4e02ad5
children 2c5631b43c7d
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
1 using System;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
2 using System.Collections.Generic;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
3 using System.Linq;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
4 using System.Text;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
5 using System.Threading;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
6 using System.Diagnostics;
35
2880242f987a initial log capabilities
cin
parents: 34
diff changeset
7 using Implab.Diagnostics;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
8
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
9 namespace Implab.Parallels {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
10 public class WorkerPool : DispatchPool<Action> {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
11
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
12 MTQueue<Action> m_queue = new MTQueue<Action>();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
13 int m_queueLength = 0;
16
cin
parents: 15
diff changeset
14 readonly int m_threshold = 1;
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
15 int m_workers = 0;
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
16
16
cin
parents: 15
diff changeset
17 public WorkerPool(int minThreads, int maxThreads, int threshold)
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
18 : base(minThreads, maxThreads) {
16
cin
parents: 15
diff changeset
19 m_threshold = threshold;
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
20 m_workers = minThreads;
16
cin
parents: 15
diff changeset
21 InitPool();
cin
parents: 15
diff changeset
22 }
cin
parents: 15
diff changeset
23
cin
parents: 15
diff changeset
24 public WorkerPool(int minThreads, int maxThreads) :
cin
parents: 15
diff changeset
25 base(minThreads, maxThreads) {
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
26 m_workers = minThreads;
16
cin
parents: 15
diff changeset
27 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
28 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
29
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
30 public WorkerPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
31 : base(threads) {
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
32 m_workers = threads;
16
cin
parents: 15
diff changeset
33 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
34 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
35
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
36 public WorkerPool()
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
37 : base() {
16
cin
parents: 15
diff changeset
38 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
39 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
40
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
41 public Promise<T> Invoke<T>(Func<T> task) {
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
42 if (task == null)
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
43 throw new ArgumentNullException("task");
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
44 if (IsDisposed)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
45 throw new ObjectDisposedException(ToString());
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
46
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
47 var promise = new Promise<T>();
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
48
40
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
49 var caller = TraceContext.Snapshot();
35
2880242f987a initial log capabilities
cin
parents: 34
diff changeset
50
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
51 EnqueueTask(delegate() {
40
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
52 caller.Invoke(delegate() {
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
53 try {
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
54 promise.Resolve(task());
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
55 } catch (Exception e) {
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
56 promise.Reject(e);
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
57 }
fe33f4e02ad5 improved tracing
cin
parents: 36
diff changeset
58 });
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
59 });
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
60
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
61 return promise;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
62 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
63
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
64 protected void EnqueueTask(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
65 Debug.Assert(unit != null);
16
cin
parents: 15
diff changeset
66 var len = Interlocked.Increment(ref m_queueLength);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
67 m_queue.Enqueue(unit);
16
cin
parents: 15
diff changeset
68
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
69 if (len > m_threshold * m_workers) {
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
70 Interlocked.Increment(ref m_workers);
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
71 GrowPool();
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
72 }
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
73 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
74
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
75 protected override bool TryDequeue(out Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
76 if (m_queue.TryDequeue(out unit)) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
77 Interlocked.Decrement(ref m_queueLength);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
78 return true;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
79 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
80 return false;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
81 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
82
34
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
83 protected override bool Suspend() {
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
84 // This override solves race condition
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
85 // WORKER CLIENT
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
86 // ---------------------------------------
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
87 // TryDeque == false
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
88 // Enqueue(unit), queueLen++
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
89 // GrowPool? == NO
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
90 // ActiveThreads--
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
91 // Suspend
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
92 // queueLength > 0
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
93 // continue
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
94 Thread.MemoryBarrier();
34
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
95 if (m_queueLength > 0)
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
96 return true;
80
4f20870d0816 added memory barriers
cin
parents: 40
diff changeset
97 Interlocked.Decrement(ref m_workers);
34
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
98 return base.Suspend();
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
99 }
dabf79fde388 fixed race condition in DispatchPool
cin
parents: 21
diff changeset
100
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
101 protected override void InvokeUnit(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
102 unit();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
103 }
16
cin
parents: 15
diff changeset
104
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
105 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
106 }