comparison Implab/Parallels/DispatchPool.cs @ 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400
parents
children 5a4b735ba669
comparison
equal deleted inserted replaced
14:e943453e5039 15:0f982f9b7d4d
1 using System;
2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Text;
5 using System.Threading;
6 using System.Diagnostics;
7
8 namespace Implab.Parallels {
9 public abstract class DispatchPool<TUnit> : IDisposable {
10 readonly int m_minThreads;
11 readonly int m_maxThreads;
12 int m_runningThreads = 0;
13 int m_maxRunningThreads = 0;
14 int m_suspended = 0;
15 int m_exitRequired = 0;
16 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
17
18 protected DispatchPool(int min, int max) {
19 if (min < 0)
20 throw new ArgumentOutOfRangeException("min");
21 if (max <= 0)
22 throw new ArgumentOutOfRangeException("max");
23
24 if (min > max)
25 min = max;
26 m_minThreads = min;
27 m_maxThreads = max;
28 }
29
30 protected DispatchPool(int threads)
31 : this(threads, threads) {
32 }
33
34 protected DispatchPool() {
35 int maxThreads, maxCP;
36 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
37
38 m_minThreads = 0;
39 m_maxThreads = maxThreads;
40 }
41
42 protected void InitPool() {
43 for (int i = 0; i < m_minThreads; i++)
44 StartWorker();
45 }
46
47 public int ThreadCount {
48 get {
49 return m_runningThreads;
50 }
51 }
52
53 public int MaxRunningThreads {
54 get {
55 return m_maxRunningThreads;
56 }
57 }
58
59 protected bool IsDisposed {
60 get {
61 return m_exitRequired != 0;
62 }
63 }
64
65 bool StartWorker() {
66 var current = m_runningThreads;
67 // use spins to allocate slot for the new thread
68 do {
69 if (current >= m_maxThreads || m_exitRequired != 0)
70 // no more slots left or the pool has been disposed
71 return false;
72 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
73
74 m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1);
75
76 // slot successfully allocated
77
78 var worker = new Thread(this.Worker);
79 worker.IsBackground = true;
80 worker.Start();
81
82 return true;
83 }
84
85 protected abstract bool TryDequeue(out TUnit unit);
86
87 protected virtual void WakeNewWorker() {
88 if (m_suspended > 0)
89 m_hasTasks.Set();
90 else
91 StartWorker();
92 }
93
94 bool FetchTask(out TUnit unit) {
95 do {
96 // exit if requested
97 if (m_exitRequired != 0) {
98 // release the thread slot
99 int running;
100 do {
101 running = m_runningThreads;
102 } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running));
103 running--;
104
105 if (running == 0) // it was the last worker
106 m_hasTasks.Dispose();
107 else
108 m_hasTasks.Set(); // release next worker
109 unit = default(TUnit);
110 return false;
111 }
112
113 // fetch task
114 if (TryDequeue(out unit)) {
115 WakeNewWorker();
116 return true;
117 }
118
119 //no tasks left, exit if the thread is no longer needed
120 int runningThreads;
121 bool exit = true;
122 do {
123 runningThreads = m_runningThreads;
124 if (runningThreads <= m_minThreads) {
125 exit = false;
126 break;
127 }
128 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads));
129
130 if (exit) {
131 Interlocked.Decrement(ref m_runningThreads);
132 return false;
133 }
134
135 // keep this thread and wait
136 Interlocked.Increment(ref m_suspended);
137 m_hasTasks.WaitOne();
138 Interlocked.Decrement(ref m_suspended);
139 } while (true);
140 }
141
142 protected abstract void InvokeUnit(TUnit unit);
143
144 void Worker() {
145 TUnit unit;
146 while (FetchTask(out unit))
147 InvokeUnit(unit);
148 }
149
150 protected virtual void Dispose(bool disposing) {
151 if (disposing) {
152 if (m_exitRequired == 0) {
153 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
154 return;
155
156 // wake sleeping threads
157 m_hasTasks.Set();
158 GC.SuppressFinalize(this);
159 }
160 }
161 }
162
163 public void Dispose() {
164 Dispose(true);
165 }
166
167 ~DispatchPool() {
168 Dispose(false);
169 }
170 }
171 }