Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 18:0c924dff5498
Слияние с promises
author | cin |
---|---|
date | Fri, 08 Nov 2013 01:27:04 +0400 |
parents | 7cd4a843b4e4 |
children | 1c3b3d518480 |
comparison
equal
deleted
inserted
replaced
6:dfa21d507bc5 | 18:0c924dff5498 |
---|---|
1 using System; | |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
9 public class WorkerPool : DispatchPool<Action> { | |
10 | |
11 MTQueue<Action> m_queue = new MTQueue<Action>(); | |
12 int m_queueLength = 0; | |
13 readonly int m_threshold = 1; | |
14 | |
15 public WorkerPool(int minThreads, int maxThreads, int threshold) | |
16 : base(minThreads, maxThreads) { | |
17 m_threshold = threshold; | |
18 InitPool(); | |
19 } | |
20 | |
21 public WorkerPool(int minThreads, int maxThreads) : | |
22 base(minThreads, maxThreads) { | |
23 InitPool(); | |
24 } | |
25 | |
26 public WorkerPool(int threads) | |
27 : base(threads) { | |
28 InitPool(); | |
29 } | |
30 | |
31 public WorkerPool() | |
32 : base() { | |
33 InitPool(); | |
34 } | |
35 | |
36 public Promise<T> Invoke<T>(Func<T> task) { | |
37 if (task == null) | |
38 throw new ArgumentNullException("task"); | |
39 if (IsDisposed) | |
40 throw new ObjectDisposedException(ToString()); | |
41 | |
42 var promise = new Promise<T>(); | |
43 | |
44 EnqueueTask(delegate() { | |
45 try { | |
46 promise.Resolve(task()); | |
47 } catch (Exception e) { | |
48 promise.Reject(e); | |
49 } | |
50 }); | |
51 | |
52 return promise; | |
53 } | |
54 | |
55 protected void EnqueueTask(Action unit) { | |
56 Debug.Assert(unit != null); | |
57 var len = Interlocked.Increment(ref m_queueLength); | |
58 m_queue.Enqueue(unit); | |
59 | |
60 if(!ExtendPool()) | |
61 WakePool(); | |
62 } | |
63 | |
64 protected override bool ExtendPool() { | |
65 if (m_queueLength <= m_threshold*ThreadCount) | |
66 // in this case we are in active thread and it request for additional workers | |
67 // satisfy it only when queue is longer than threshold | |
68 return false; | |
69 return base.ExtendPool(); | |
70 } | |
71 | |
72 protected override bool TryDequeue(out Action unit) { | |
73 if (m_queue.TryDequeue(out unit)) { | |
74 Interlocked.Decrement(ref m_queueLength); | |
75 return true; | |
76 } | |
77 return false; | |
78 } | |
79 | |
80 protected override void InvokeUnit(Action unit) { | |
81 unit(); | |
82 } | |
83 | |
84 protected override void Suspend() { | |
85 if (m_queueLength == 0) | |
86 base.Suspend(); | |
87 } | |
88 } | |
89 } |