annotate Implab/Parallels/WorkerPool.cs @ 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400
parents b0feb5b9ad1c
children 5a4b735ba669
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
1 using System;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
2 using System.Collections.Generic;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
3 using System.Linq;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
4 using System.Text;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
5 using System.Threading;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
6 using System.Diagnostics;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
7
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
8 namespace Implab.Parallels {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
9 public class WorkerPool : DispatchPool<Action> {
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
10
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
11 MTQueue<Action> m_queue = new MTQueue<Action>();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
12 int m_queueLength = 0;
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
13
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
14 public WorkerPool(int minThreads, int maxThreads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
15 : base(minThreads, maxThreads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
16 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
17 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
18
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
19 public WorkerPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
20 : base(threads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
21 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
22 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
23
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
24 public WorkerPool()
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
25 : base() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
26 InitPool();
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
27 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
28
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
29 public Promise<T> Invoke<T>(Func<T> task) {
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
30 if (task == null)
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
31 throw new ArgumentNullException("task");
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
32 if (IsDisposed)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
33 throw new ObjectDisposedException(ToString());
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
34
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
35 var promise = new Promise<T>();
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
36
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
37 EnqueueTask(delegate() {
13
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
38 try {
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
39 promise.Resolve(task());
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
40 } catch (Exception e) {
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
41 promise.Reject(e);
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
42 }
b0feb5b9ad1c small fixes, WorkerPool still incomplete
cin
parents: 12
diff changeset
43 });
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
44
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
45 return promise;
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
46 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
47
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
48 protected void EnqueueTask(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
49 Debug.Assert(unit != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
50 Interlocked.Increment(ref m_queueLength);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
51 m_queue.Enqueue(unit);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
52 // if there are sleeping threads in the pool wake one
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
53 // probably this will lead a dry run
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
54 WakeNewWorker();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
55 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
56
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
57 protected override bool TryDequeue(out Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
58 if (m_queue.TryDequeue(out unit)) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
59 Interlocked.Decrement(ref m_queueLength);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
60 return true;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
61 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
62 return false;
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
63 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
64
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
65 protected override void InvokeUnit(Action unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents: 13
diff changeset
66 unit();
12
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
67 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
68 }
eb418ba8275b refactoring, added WorkerPool
cin
parents:
diff changeset
69 }