annotate Implab/Parallels/WorkerPool.cs @ 20:1c3b3d518480 promises

refactoring, sync
author cin
date Tue, 12 Nov 2013 02:27:22 +0400
parents 7cd4a843b4e4
children 6a56df4ec59e
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
1 using System;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
2 using System.Collections.Generic;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
3 using System.Linq;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
4 using System.Text;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
5 using System.Threading;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
6 using System.Diagnostics;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
7
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
8 namespace Implab.Parallels {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
9 public class WorkerPool : DispatchPool<Action> {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
10
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
11 MTQueue<Action> m_queue = new MTQueue<Action>();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
12 int m_queueLength = 0;
16
cin
parents: 15
diff changeset
13 readonly int m_threshold = 1;
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
14
16
cin
parents: 15
diff changeset
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
16 : base(minThreads, maxThreads) {
16
cin
parents: 15
diff changeset
17 m_threshold = threshold;
cin
parents: 15
diff changeset
18 InitPool();
cin
parents: 15
diff changeset
19 }
cin
parents: 15
diff changeset
20
cin
parents: 15
diff changeset
21 public WorkerPool(int minThreads, int maxThreads) :
cin
parents: 15
diff changeset
22 base(minThreads, maxThreads) {
cin
parents: 15
diff changeset
23 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
24 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
25
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
26 public WorkerPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
27 : base(threads) {
16
cin
parents: 15
diff changeset
28 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
29 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
30
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
31 public WorkerPool()
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
32 : base() {
16
cin
parents: 15
diff changeset
33 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
34 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
35
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
36 public Promise<T> Invoke<T>(Func<T> task) {
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
37 if (task == null)
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
38 throw new ArgumentNullException("task");
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
39 if (IsDisposed)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
40 throw new ObjectDisposedException(ToString());
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
41
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
42 var promise = new Promise<T>();
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
43
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
44 EnqueueTask(delegate() {
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
45 try {
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
46 promise.Resolve(task());
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
47 } catch (Exception e) {
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
48 promise.Reject(e);
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
49 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
50 });
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
51
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
52 return promise;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
53 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
54
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
55 protected void EnqueueTask(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
56 Debug.Assert(unit != null);
16
cin
parents: 15
diff changeset
57 var len = Interlocked.Increment(ref m_queueLength);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
58 m_queue.Enqueue(unit);
16
cin
parents: 15
diff changeset
59
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
60 ExtendPool();
16
cin
parents: 15
diff changeset
61 }
cin
parents: 15
diff changeset
62
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
63 protected override bool ExtendPool() {
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
64 if (m_queueLength <= m_threshold*ActiveThreads)
16
cin
parents: 15
diff changeset
65 // in this case we are in active thread and it request for additional workers
cin
parents: 15
diff changeset
66 // satisfy it only when queue is longer than threshold
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
67 return false;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
68 return base.ExtendPool();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
69 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
70
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
71 protected override bool TryDequeue(out Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
72 if (m_queue.TryDequeue(out unit)) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
73 Interlocked.Decrement(ref m_queueLength);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
74 return true;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
75 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
76 return false;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
77 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
78
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
79 protected override void InvokeUnit(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
80 unit();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
81 }
16
cin
parents: 15
diff changeset
82
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
83 protected override bool Suspend() {
16
cin
parents: 15
diff changeset
84 if (m_queueLength == 0)
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
85 return base.Suspend();
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
86 else
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
87 return true; // spin again without locks...
16
cin
parents: 15
diff changeset
88 }
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
89 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
90 }