annotate Implab/Parallels/WorkerPool.cs @ 33:b255e4aeef17

removed the reference to the parent from the promise object this allows resolved promises to release parents and results they are holding. Added complete set of operations to IPromiseBase interface Subscribing to the cancellation event of the promise should not affect it's IsExclusive property More tests.
author cin
date Thu, 10 Apr 2014 02:39:29 +0400
parents 6a56df4ec59e
children dabf79fde388
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
1 using System;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
2 using System.Collections.Generic;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
3 using System.Linq;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
4 using System.Text;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
5 using System.Threading;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
6 using System.Diagnostics;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
7
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
8 namespace Implab.Parallels {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
9 public class WorkerPool : DispatchPool<Action> {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
10
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
11 MTQueue<Action> m_queue = new MTQueue<Action>();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
12 int m_queueLength = 0;
16
cin
parents: 15
diff changeset
13 readonly int m_threshold = 1;
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
14
16
cin
parents: 15
diff changeset
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
16 : base(minThreads, maxThreads) {
16
cin
parents: 15
diff changeset
17 m_threshold = threshold;
cin
parents: 15
diff changeset
18 InitPool();
cin
parents: 15
diff changeset
19 }
cin
parents: 15
diff changeset
20
cin
parents: 15
diff changeset
21 public WorkerPool(int minThreads, int maxThreads) :
cin
parents: 15
diff changeset
22 base(minThreads, maxThreads) {
cin
parents: 15
diff changeset
23 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
24 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
25
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
26 public WorkerPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
27 : base(threads) {
16
cin
parents: 15
diff changeset
28 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
29 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
30
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
31 public WorkerPool()
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
32 : base() {
16
cin
parents: 15
diff changeset
33 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
34 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
35
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
36 public Promise<T> Invoke<T>(Func<T> task) {
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
37 if (task == null)
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
38 throw new ArgumentNullException("task");
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
39 if (IsDisposed)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
40 throw new ObjectDisposedException(ToString());
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
41
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
42 var promise = new Promise<T>();
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
43
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
44 EnqueueTask(delegate() {
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
45 try {
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
46 promise.Resolve(task());
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
47 } catch (Exception e) {
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
48 promise.Reject(e);
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
49 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
50 });
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
51
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
52 return promise;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
53 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
54
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
55 protected void EnqueueTask(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
56 Debug.Assert(unit != null);
16
cin
parents: 15
diff changeset
57 var len = Interlocked.Increment(ref m_queueLength);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
58 m_queue.Enqueue(unit);
16
cin
parents: 15
diff changeset
59
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
60 if (len > m_threshold*ActiveThreads)
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
61 GrowPool();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
62 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
63
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
64 protected override bool TryDequeue(out Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
65 if (m_queue.TryDequeue(out unit)) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
66 Interlocked.Decrement(ref m_queueLength);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
67 return true;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
68 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
69 return false;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
70 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
71
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
72 protected override void InvokeUnit(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
73 unit();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
74 }
16
cin
parents: 15
diff changeset
75
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
76 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
77 }