Mercurial > pub > ImplabNet
comparison Implab/Parallels/DispatchPool.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | |
children | 5a4b735ba669 |
comparison
equal
deleted
inserted
replaced
14:e943453e5039 | 15:0f982f9b7d4d |
---|---|
1 using System; | |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
9 public abstract class DispatchPool<TUnit> : IDisposable { | |
10 readonly int m_minThreads; | |
11 readonly int m_maxThreads; | |
12 int m_runningThreads = 0; | |
13 int m_maxRunningThreads = 0; | |
14 int m_suspended = 0; | |
15 int m_exitRequired = 0; | |
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false); | |
17 | |
18 protected DispatchPool(int min, int max) { | |
19 if (min < 0) | |
20 throw new ArgumentOutOfRangeException("min"); | |
21 if (max <= 0) | |
22 throw new ArgumentOutOfRangeException("max"); | |
23 | |
24 if (min > max) | |
25 min = max; | |
26 m_minThreads = min; | |
27 m_maxThreads = max; | |
28 } | |
29 | |
30 protected DispatchPool(int threads) | |
31 : this(threads, threads) { | |
32 } | |
33 | |
34 protected DispatchPool() { | |
35 int maxThreads, maxCP; | |
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
37 | |
38 m_minThreads = 0; | |
39 m_maxThreads = maxThreads; | |
40 } | |
41 | |
42 protected void InitPool() { | |
43 for (int i = 0; i < m_minThreads; i++) | |
44 StartWorker(); | |
45 } | |
46 | |
47 public int ThreadCount { | |
48 get { | |
49 return m_runningThreads; | |
50 } | |
51 } | |
52 | |
53 public int MaxRunningThreads { | |
54 get { | |
55 return m_maxRunningThreads; | |
56 } | |
57 } | |
58 | |
59 protected bool IsDisposed { | |
60 get { | |
61 return m_exitRequired != 0; | |
62 } | |
63 } | |
64 | |
65 bool StartWorker() { | |
66 var current = m_runningThreads; | |
67 // use spins to allocate slot for the new thread | |
68 do { | |
69 if (current >= m_maxThreads || m_exitRequired != 0) | |
70 // no more slots left or the pool has been disposed | |
71 return false; | |
72 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | |
73 | |
74 m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); | |
75 | |
76 // slot successfully allocated | |
77 | |
78 var worker = new Thread(this.Worker); | |
79 worker.IsBackground = true; | |
80 worker.Start(); | |
81 | |
82 return true; | |
83 } | |
84 | |
85 protected abstract bool TryDequeue(out TUnit unit); | |
86 | |
87 protected virtual void WakeNewWorker() { | |
88 if (m_suspended > 0) | |
89 m_hasTasks.Set(); | |
90 else | |
91 StartWorker(); | |
92 } | |
93 | |
94 bool FetchTask(out TUnit unit) { | |
95 do { | |
96 // exit if requested | |
97 if (m_exitRequired != 0) { | |
98 // release the thread slot | |
99 int running; | |
100 do { | |
101 running = m_runningThreads; | |
102 } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); | |
103 running--; | |
104 | |
105 if (running == 0) // it was the last worker | |
106 m_hasTasks.Dispose(); | |
107 else | |
108 m_hasTasks.Set(); // release next worker | |
109 unit = default(TUnit); | |
110 return false; | |
111 } | |
112 | |
113 // fetch task | |
114 if (TryDequeue(out unit)) { | |
115 WakeNewWorker(); | |
116 return true; | |
117 } | |
118 | |
119 //no tasks left, exit if the thread is no longer needed | |
120 int runningThreads; | |
121 bool exit = true; | |
122 do { | |
123 runningThreads = m_runningThreads; | |
124 if (runningThreads <= m_minThreads) { | |
125 exit = false; | |
126 break; | |
127 } | |
128 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); | |
129 | |
130 if (exit) { | |
131 Interlocked.Decrement(ref m_runningThreads); | |
132 return false; | |
133 } | |
134 | |
135 // keep this thread and wait | |
136 Interlocked.Increment(ref m_suspended); | |
137 m_hasTasks.WaitOne(); | |
138 Interlocked.Decrement(ref m_suspended); | |
139 } while (true); | |
140 } | |
141 | |
142 protected abstract void InvokeUnit(TUnit unit); | |
143 | |
144 void Worker() { | |
145 TUnit unit; | |
146 while (FetchTask(out unit)) | |
147 InvokeUnit(unit); | |
148 } | |
149 | |
150 protected virtual void Dispose(bool disposing) { | |
151 if (disposing) { | |
152 if (m_exitRequired == 0) { | |
153 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) | |
154 return; | |
155 | |
156 // wake sleeping threads | |
157 m_hasTasks.Set(); | |
158 GC.SuppressFinalize(this); | |
159 } | |
160 } | |
161 } | |
162 | |
163 public void Dispose() { | |
164 Dispose(true); | |
165 } | |
166 | |
167 ~DispatchPool() { | |
168 Dispose(false); | |
169 } | |
170 } | |
171 } |