12
|
1 using System;
|
|
2 using System.Collections.Generic;
|
|
3 using System.Linq;
|
|
4 using System.Text;
|
|
5 using System.Threading;
|
|
6 using System.Diagnostics;
|
|
7
|
|
8 namespace Implab.Parallels {
|
15
|
9 public class WorkerPool : DispatchPool<Action> {
|
12
|
10
|
15
|
11 MTQueue<Action> m_queue = new MTQueue<Action>();
|
|
12 int m_queueLength = 0;
|
13
|
13
|
15
|
14 public WorkerPool(int minThreads, int maxThreads)
|
|
15 : base(minThreads, maxThreads) {
|
|
16 InitPool();
|
13
|
17 }
|
|
18
|
15
|
19 public WorkerPool(int threads)
|
|
20 : base(threads) {
|
|
21 InitPool();
|
13
|
22 }
|
|
23
|
15
|
24 public WorkerPool()
|
|
25 : base() {
|
|
26 InitPool();
|
13
|
27 }
|
|
28
|
12
|
29 public Promise<T> Invoke<T>(Func<T> task) {
|
|
30 if (task == null)
|
|
31 throw new ArgumentNullException("task");
|
15
|
32 if (IsDisposed)
|
|
33 throw new ObjectDisposedException(ToString());
|
12
|
34
|
|
35 var promise = new Promise<T>();
|
|
36
|
15
|
37 EnqueueTask(delegate() {
|
13
|
38 try {
|
|
39 promise.Resolve(task());
|
|
40 } catch (Exception e) {
|
|
41 promise.Reject(e);
|
|
42 }
|
|
43 });
|
12
|
44
|
|
45 return promise;
|
|
46 }
|
|
47
|
15
|
48 protected void EnqueueTask(Action unit) {
|
|
49 Debug.Assert(unit != null);
|
|
50 Interlocked.Increment(ref m_queueLength);
|
|
51 m_queue.Enqueue(unit);
|
|
52 // if there are sleeping threads in the pool wake one
|
|
53 // probably this will lead a dry run
|
|
54 WakeNewWorker();
|
12
|
55 }
|
|
56
|
15
|
57 protected override bool TryDequeue(out Action unit) {
|
|
58 if (m_queue.TryDequeue(out unit)) {
|
|
59 Interlocked.Decrement(ref m_queueLength);
|
|
60 return true;
|
12
|
61 }
|
15
|
62 return false;
|
12
|
63 }
|
|
64
|
15
|
65 protected override void InvokeUnit(Action unit) {
|
|
66 unit();
|
12
|
67 }
|
|
68 }
|
|
69 }
|