comparison Implab/Parallels/WorkerPool.cs @ 18:0c924dff5498

Слияние с promises
author cin
date Fri, 08 Nov 2013 01:27:04 +0400
parents 7cd4a843b4e4
children 1c3b3d518480
comparison
equal deleted inserted replaced
6:dfa21d507bc5 18:0c924dff5498
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public class WorkerPool : DispatchPool<Action> {
10
11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
13 readonly int m_threshold = 1;
14
15 public WorkerPool(int minThreads, int maxThreads, int threshold)
16 : base(minThreads, maxThreads) {
17 m_threshold = threshold;
18 InitPool();
19 }
20
21 public WorkerPool(int minThreads, int maxThreads) :
22 base(minThreads, maxThreads) {
23 InitPool();
24 }
25
26 public WorkerPool(int threads)
27 : base(threads) {
28 InitPool();
29 }
30
31 public WorkerPool()
32 : base() {
33 InitPool();
34 }
35
36 public Promise<T> Invoke<T>(Func<T> task) {
37 if (task == null)
38 throw new ArgumentNullException("task");
39 if (IsDisposed)
40 throw new ObjectDisposedException(ToString());
41
42 var promise = new Promise<T>();
43
44 EnqueueTask(delegate() {
45 try {
46 promise.Resolve(task());
47 } catch (Exception e) {
48 promise.Reject(e);
49 }
50 });
51
52 return promise;
53 }
54
55 protected void EnqueueTask(Action unit) {
56 Debug.Assert(unit != null);
57 var len = Interlocked.Increment(ref m_queueLength);
58 m_queue.Enqueue(unit);
59
60 if(!ExtendPool())
61 WakePool();
62 }
63
64 protected override bool ExtendPool() {
65 if (m_queueLength <= m_threshold*ThreadCount)
66 // in this case we are in active thread and it request for additional workers
67 // satisfy it only when queue is longer than threshold
68 return false;
69 return base.ExtendPool();
70 }
71
72 protected override bool TryDequeue(out Action unit) {
73 if (m_queue.TryDequeue(out unit)) {
74 Interlocked.Decrement(ref m_queueLength);
75 return true;
76 }
77 return false;
78 }
79
80 protected override void InvokeUnit(Action unit) {
81 unit();
82 }
83
84 protected override void Suspend() {
85 if (m_queueLength == 0)
86 base.Suspend();
87 }
88 }
89 }