Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | b0feb5b9ad1c |
children | 5a4b735ba669 |
comparison
equal
deleted
inserted
replaced
14:e943453e5039 | 15:0f982f9b7d4d |
---|---|
4 using System.Text; | 4 using System.Text; |
5 using System.Threading; | 5 using System.Threading; |
6 using System.Diagnostics; | 6 using System.Diagnostics; |
7 | 7 |
8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
9 public class WorkerPool : IDisposable { | 9 public class WorkerPool : DispatchPool<Action> { |
10 readonly int m_minThreads; | |
11 readonly int m_maxThreads; | |
12 int m_runningThreads; | |
13 object m_lock = new object(); | |
14 | 10 |
15 bool m_disposed = false; | 11 MTQueue<Action> m_queue = new MTQueue<Action>(); |
12 int m_queueLength = 0; | |
16 | 13 |
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed | 14 public WorkerPool(int minThreads, int maxThreads) |
18 ManualResetEvent m_hasTasks = new ManualResetEvent(false); | 15 : base(minThreads, maxThreads) { |
19 Queue<Action> m_queue = new Queue<Action>(); | 16 InitPool(); |
20 | |
21 public WorkerPool(int min, int max) { | |
22 if (min < 0) | |
23 throw new ArgumentOutOfRangeException("min"); | |
24 if (max <= 0) | |
25 throw new ArgumentOutOfRangeException("max"); | |
26 | |
27 if (min > max) | |
28 min = max; | |
29 m_minThreads = min; | |
30 m_maxThreads = max; | |
31 | |
32 InitPool(); | |
33 } | 17 } |
34 | 18 |
35 public WorkerPool(int max) | 19 public WorkerPool(int threads) |
36 : this(0, max) { | 20 : base(threads) { |
21 InitPool(); | |
37 } | 22 } |
38 | 23 |
39 public WorkerPool() { | 24 public WorkerPool() |
40 int maxThreads, maxCP; | 25 : base() { |
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | 26 InitPool(); |
42 | |
43 m_minThreads = 0; | |
44 m_maxThreads = maxThreads; | |
45 | |
46 InitPool(); | |
47 } | |
48 | |
49 void InitPool() { | |
50 for (int i = 0; i < m_minThreads; i++) | |
51 StartWorker(); | |
52 } | |
53 | |
54 public int ThreadCount { | |
55 get { | |
56 return m_runningThreads; | |
57 } | |
58 } | 27 } |
59 | 28 |
60 public Promise<T> Invoke<T>(Func<T> task) { | 29 public Promise<T> Invoke<T>(Func<T> task) { |
61 if (m_disposed) | |
62 throw new ObjectDisposedException(ToString()); | |
63 if (task == null) | 30 if (task == null) |
64 throw new ArgumentNullException("task"); | 31 throw new ArgumentNullException("task"); |
32 if (IsDisposed) | |
33 throw new ObjectDisposedException(ToString()); | |
65 | 34 |
66 var promise = new Promise<T>(); | 35 var promise = new Promise<T>(); |
67 | 36 |
68 var queueLen = EnqueueTask(delegate() { | 37 EnqueueTask(delegate() { |
69 try { | 38 try { |
70 promise.Resolve(task()); | 39 promise.Resolve(task()); |
71 } catch (Exception e) { | 40 } catch (Exception e) { |
72 promise.Reject(e); | 41 promise.Reject(e); |
73 } | 42 } |
74 }); | 43 }); |
75 | 44 |
76 if (queueLen > 1) | |
77 StartWorker(); | |
78 | |
79 return promise; | 45 return promise; |
80 } | 46 } |
81 | 47 |
82 bool StartWorker() { | 48 protected void EnqueueTask(Action unit) { |
83 var current = m_runningThreads; | 49 Debug.Assert(unit != null); |
84 // use spins to allocate slot for the new thread | 50 Interlocked.Increment(ref m_queueLength); |
85 do { | 51 m_queue.Enqueue(unit); |
86 if (current >= m_maxThreads) | 52 // if there are sleeping threads in the pool wake one |
87 // no more slots left | 53 // probably this will lead a dry run |
88 return false; | 54 WakeNewWorker(); |
89 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | |
90 | |
91 // slot successfully allocated | |
92 | |
93 var worker = new Thread(this.Worker); | |
94 worker.IsBackground = true; | |
95 worker.Start(); | |
96 | |
97 return true; | |
98 } | 55 } |
99 | 56 |
100 int EnqueueTask(Action task) { | 57 protected override bool TryDequeue(out Action unit) { |
101 Debug.Assert(task != null); | 58 if (m_queue.TryDequeue(out unit)) { |
102 lock (m_queue) { | 59 Interlocked.Decrement(ref m_queueLength); |
103 m_queue.Enqueue(task); | 60 return true; |
104 m_hasTasks.Set(); | |
105 return m_queue.Count; | |
106 } | 61 } |
62 return false; | |
107 } | 63 } |
108 | 64 |
109 bool FetchTask(out Action task) { | 65 protected override void InvokeUnit(Action unit) { |
110 task = null; | 66 unit(); |
111 | |
112 while (true) { | |
113 | |
114 m_hasTasks.WaitOne(); | |
115 | |
116 if (m_disposed) | |
117 return false; | |
118 | |
119 lock (m_queue) { | |
120 if (m_queue.Count > 0) { | |
121 task = m_queue.Dequeue(); | |
122 return true; | |
123 } | |
124 | |
125 // no tasks left | |
126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task | |
127 m_hasTasks.Reset(); | |
128 } | |
129 | |
130 bool exit = true; | |
131 | |
132 var current = m_runningThreads; | |
133 do { | |
134 if (current <= m_minThreads) { | |
135 exit = false; // this thread should return and wait for the new events | |
136 break; | |
137 } | |
138 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | |
139 | |
140 if (exit) | |
141 return false; | |
142 } | |
143 } | |
144 | |
145 void Worker() { | |
146 Action task; | |
147 while (FetchTask(out task)) | |
148 task(); | |
149 } | |
150 | |
151 protected virtual void Dispose(bool disposing) { | |
152 if (disposing) { | |
153 lock (m_lock) { | |
154 if (m_disposed) | |
155 return; | |
156 m_disposed = true; | |
157 } | |
158 m_hasTasks.Set(); | |
159 GC.SuppressFinalize(this); | |
160 } | |
161 } | |
162 | |
163 public void Dispose() { | |
164 Dispose(true); | |
165 } | |
166 | |
167 ~WorkerPool() { | |
168 Dispose(false); | |
169 } | 67 } |
170 } | 68 } |
171 } | 69 } |