comparison Implab/Parallels/WorkerPool.cs @ 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400
parents b0feb5b9ad1c
children 5a4b735ba669
comparison
equal deleted inserted replaced
14:e943453e5039 15:0f982f9b7d4d
4 using System.Text; 4 using System.Text;
5 using System.Threading; 5 using System.Threading;
6 using System.Diagnostics; 6 using System.Diagnostics;
7 7
8 namespace Implab.Parallels { 8 namespace Implab.Parallels {
9 public class WorkerPool : IDisposable { 9 public class WorkerPool : DispatchPool<Action> {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads;
13 object m_lock = new object();
14 10
15 bool m_disposed = false; 11 MTQueue<Action> m_queue = new MTQueue<Action>();
12 int m_queueLength = 0;
16 13
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed 14 public WorkerPool(int minThreads, int maxThreads)
18 ManualResetEvent m_hasTasks = new ManualResetEvent(false); 15 : base(minThreads, maxThreads) {
19 Queue<Action> m_queue = new Queue<Action>(); 16 InitPool();
20
21 public WorkerPool(int min, int max) {
22 if (min < 0)
23 throw new ArgumentOutOfRangeException("min");
24 if (max <= 0)
25 throw new ArgumentOutOfRangeException("max");
26
27 if (min > max)
28 min = max;
29 m_minThreads = min;
30 m_maxThreads = max;
31
32 InitPool();
33 } 17 }
34 18
35 public WorkerPool(int max) 19 public WorkerPool(int threads)
36 : this(0, max) { 20 : base(threads) {
21 InitPool();
37 } 22 }
38 23
39 public WorkerPool() { 24 public WorkerPool()
40 int maxThreads, maxCP; 25 : base() {
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); 26 InitPool();
42
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
45
46 InitPool();
47 }
48
49 void InitPool() {
50 for (int i = 0; i < m_minThreads; i++)
51 StartWorker();
52 }
53
54 public int ThreadCount {
55 get {
56 return m_runningThreads;
57 }
58 } 27 }
59 28
60 public Promise<T> Invoke<T>(Func<T> task) { 29 public Promise<T> Invoke<T>(Func<T> task) {
61 if (m_disposed)
62 throw new ObjectDisposedException(ToString());
63 if (task == null) 30 if (task == null)
64 throw new ArgumentNullException("task"); 31 throw new ArgumentNullException("task");
32 if (IsDisposed)
33 throw new ObjectDisposedException(ToString());
65 34
66 var promise = new Promise<T>(); 35 var promise = new Promise<T>();
67 36
68 var queueLen = EnqueueTask(delegate() { 37 EnqueueTask(delegate() {
69 try { 38 try {
70 promise.Resolve(task()); 39 promise.Resolve(task());
71 } catch (Exception e) { 40 } catch (Exception e) {
72 promise.Reject(e); 41 promise.Reject(e);
73 } 42 }
74 }); 43 });
75 44
76 if (queueLen > 1)
77 StartWorker();
78
79 return promise; 45 return promise;
80 } 46 }
81 47
82 bool StartWorker() { 48 protected void EnqueueTask(Action unit) {
83 var current = m_runningThreads; 49 Debug.Assert(unit != null);
84 // use spins to allocate slot for the new thread 50 Interlocked.Increment(ref m_queueLength);
85 do { 51 m_queue.Enqueue(unit);
86 if (current >= m_maxThreads) 52 // if there are sleeping threads in the pool wake one
87 // no more slots left 53 // probably this will lead a dry run
88 return false; 54 WakeNewWorker();
89 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
90
91 // slot successfully allocated
92
93 var worker = new Thread(this.Worker);
94 worker.IsBackground = true;
95 worker.Start();
96
97 return true;
98 } 55 }
99 56
100 int EnqueueTask(Action task) { 57 protected override bool TryDequeue(out Action unit) {
101 Debug.Assert(task != null); 58 if (m_queue.TryDequeue(out unit)) {
102 lock (m_queue) { 59 Interlocked.Decrement(ref m_queueLength);
103 m_queue.Enqueue(task); 60 return true;
104 m_hasTasks.Set();
105 return m_queue.Count;
106 } 61 }
62 return false;
107 } 63 }
108 64
109 bool FetchTask(out Action task) { 65 protected override void InvokeUnit(Action unit) {
110 task = null; 66 unit();
111
112 while (true) {
113
114 m_hasTasks.WaitOne();
115
116 if (m_disposed)
117 return false;
118
119 lock (m_queue) {
120 if (m_queue.Count > 0) {
121 task = m_queue.Dequeue();
122 return true;
123 }
124
125 // no tasks left
126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
127 m_hasTasks.Reset();
128 }
129
130 bool exit = true;
131
132 var current = m_runningThreads;
133 do {
134 if (current <= m_minThreads) {
135 exit = false; // this thread should return and wait for the new events
136 break;
137 }
138 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
139
140 if (exit)
141 return false;
142 }
143 }
144
145 void Worker() {
146 Action task;
147 while (FetchTask(out task))
148 task();
149 }
150
151 protected virtual void Dispose(bool disposing) {
152 if (disposing) {
153 lock (m_lock) {
154 if (m_disposed)
155 return;
156 m_disposed = true;
157 }
158 m_hasTasks.Set();
159 GC.SuppressFinalize(this);
160 }
161 }
162
163 public void Dispose() {
164 Dispose(true);
165 }
166
167 ~WorkerPool() {
168 Dispose(false);
169 } 67 }
170 } 68 }
171 } 69 }