annotate Implab/Parallels/DispatchPool.cs @ 33:b255e4aeef17

removed the reference to the parent from the promise object this allows resolved promises to release parents and results they are holding. Added complete set of operations to IPromiseBase interface Subscribing to the cancellation event of the promise should not affect it's IsExclusive property More tests.
author cin
date Thu, 10 Apr 2014 02:39:29 +0400
parents 2fad2d1f4b03
children dabf79fde388
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
1 using System;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
2 using System.Collections.Generic;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
3 using System.Linq;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
4 using System.Text;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
5 using System.Threading;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
6 using System.Diagnostics;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
7
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
8 namespace Implab.Parallels {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
9 public abstract class DispatchPool<TUnit> : IDisposable {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
10 readonly int m_minThreads;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
11 readonly int m_maxThreads;
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
12
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
13 int m_createdThreads = 0; // the current size of the pool
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
14 int m_activeThreads = 0; // the count of threads which are active
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
15 int m_sleepingThreads = 0; // the count of currently inactive threads
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
16 int m_maxRunningThreads = 0; // the meximum reached size of the pool
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
17 int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
18 int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
19 int m_wakeEvents = 0; // the count of wake events
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
20
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
21 AutoResetEvent m_hasTasks = new AutoResetEvent(false);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
22
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
23 protected DispatchPool(int min, int max) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
24 if (min < 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
25 throw new ArgumentOutOfRangeException("min");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
26 if (max <= 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
27 throw new ArgumentOutOfRangeException("max");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
28
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
29 if (min > max)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
30 min = max;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
31 m_minThreads = min;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
32 m_maxThreads = max;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
33 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
34
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
35 protected DispatchPool(int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
36 : this(threads, threads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
37 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
38
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
39 protected DispatchPool() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
40 int maxThreads, maxCP;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
42
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
43 m_minThreads = 0;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
44 m_maxThreads = maxThreads;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
45 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
46
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
47 protected void InitPool() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
48 for (int i = 0; i < m_minThreads; i++)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
49 StartWorker();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
50 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
51
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
52 public int PoolSize {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
53 get {
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
54 return m_createdThreads;
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
55 }
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
56 }
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
57
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
58 public int ActiveThreads {
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
59 get {
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
60 return m_activeThreads;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
61 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
62 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
63
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
64 public int MaxRunningThreads {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
65 get {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
66 return m_maxRunningThreads;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
67 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
68 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
69
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
70 protected bool IsDisposed {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
71 get {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
72 return m_exitRequired != 0;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
73 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
74 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
75
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
76 protected abstract bool TryDequeue(out TUnit unit);
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
77
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
78 #region thread execution traits
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
79 int SignalThread() {
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
80 var signals = Interlocked.Increment(ref m_wakeEvents);
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
81 if(signals == 1)
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
82 m_hasTasks.Set();
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
83 return signals;
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
84 }
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
85
22
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
86 bool FetchSignalOrWait(int timeout) {
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
87 var start = Environment.TickCount;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
88
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
90 // ее вернуть, чтобы другой ожидающий поток смог
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
91 bool hasLock = false;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
92 do {
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
93 int signals;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
94 do {
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
95 signals = m_wakeEvents;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
96 if (signals == 0)
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
97 break;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
99
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
100 if (signals >= 1) {
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
101 if (signals > 1 && hasLock)
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
102 m_hasTasks.Set();
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
103 return true;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
104 }
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
105
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
106 if (timeout != -1)
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
108
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
110 // и уйдет на пустой цикл, после чего заблокируется
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
111
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
112 hasLock = true;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
113 } while (m_hasTasks.WaitOne(timeout));
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
114
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
115 return false;
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
116 }
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
117
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
118 bool Sleep(int timeout) {
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
119 Interlocked.Increment(ref m_sleepingThreads);
22
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
120 if (FetchSignalOrWait(timeout)) {
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
121 Interlocked.Decrement(ref m_sleepingThreads);
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
122 return true;
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
123 } else {
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
124 Interlocked.Decrement(ref m_sleepingThreads);
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
125 return false;
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
126 }
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
127 }
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
128 #endregion
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
129
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
130 /// <summary>
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
131 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
132 /// </summary>
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
133 protected void GrowPool() {
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
134 if (m_exitRequired != 0)
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
135 return;
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
136 if (m_sleepingThreads > m_wakeEvents) {
22
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
138
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
139 // all sleeping threads may gone
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
140 SignalThread(); // wake a sleeping thread;
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
141
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
142 // we can't check whether signal has been processed
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
143 // anyway it may take some time for the thread to start
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
144 // we will ensure that at least one thread is running
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
145
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
146 EnsurePoolIsAlive();
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
147 } else {
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
148 // if there is no sleeping threads in the pool
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
149 if (!StartWorker()) {
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
150 // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue
24
ee04e1fa78da fixed dispatch pool race condition
cin
parents: 22
diff changeset
151 // send it a signal to spin again
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
152 SignalThread();
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
153 EnsurePoolIsAlive();
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
154 }
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
155 }
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
156 }
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
157
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
158 private void EnsurePoolIsAlive() {
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
159 if (AllocateThreadSlot(1)) {
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
160 // if there were no threads in the pool
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
161 var worker = new Thread(this.Worker);
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
162 worker.IsBackground = true;
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
163 worker.Start();
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
164 }
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
165 }
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
166
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
167 private bool Suspend() {
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
168 //no tasks left, exit if the thread is no longer needed
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
169 bool last;
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
170 bool requestExit;
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
171
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
172 // if threads have a timeout before releasing
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
173 if (m_releaseTimeout > 0)
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
174 requestExit = !Sleep(m_releaseTimeout);
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
175 else
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
176 requestExit = true;
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
177
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
178 if (!requestExit)
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
179 return true;
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
180
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
181 // release unsused thread
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
182 if (requestExit && ReleaseThreadSlot(out last)) {
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
183 // in case at the moment the last thread was being released
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
184 // a new task was added to the queue, we need to try
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
185 // to revoke the thread to avoid the situation when the task is left unprocessed
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
186 if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
187 SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 24
diff changeset
188 return AllocateThreadSlot(1); // ensure that at least one thread is alive
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
189 }
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
190
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
191 return false;
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
192 }
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
193
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
194 // wait till infinity
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
195 Sleep(-1);
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
196
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
197 return true;
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
198 }
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
199
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
200 #region thread slots traits
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
201
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
202 bool AllocateThreadSlot() {
16
cin
parents: 15
diff changeset
203 int current;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
204 // use spins to allocate slot for the new thread
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
205 do {
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
206 current = m_createdThreads;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
207 if (current >= m_maxThreads || m_exitRequired != 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
208 // no more slots left or the pool has been disposed
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
209 return false;
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
210 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
211
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
212 UpdateMaxThreads(current + 1);
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
213
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
214 return true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
215 }
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
216
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
217 bool AllocateThreadSlot(int desired) {
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
218 if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1))
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
219 return false;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
220
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
221 UpdateMaxThreads(desired);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
222
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
223 return true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
224 }
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
225
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
226 bool ReleaseThreadSlot(out bool last) {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
227 last = false;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
228 int current;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
229 // use spins to release slot for the new thread
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
230 do {
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
231 current = m_createdThreads;
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
232 if (current <= m_minThreads && m_exitRequired == 0)
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
233 // the thread is reserved
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
234 return false;
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
235 } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current));
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
236
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
237 last = (current == 1);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
238
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
239 return true;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
240 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
241
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
242 /// <summary>
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
243 /// releases thread slot unconditionally, used during cleanup
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
244 /// </summary>
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
245 /// <returns>true - no more threads left</returns>
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
246 bool ReleaseThreadSlotAnyway() {
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
247 var left = Interlocked.Decrement(ref m_createdThreads);
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
248 return left == 0;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
249 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
250
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
251 void UpdateMaxThreads(int count) {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
252 int max;
16
cin
parents: 15
diff changeset
253 do {
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
254 max = m_maxRunningThreads;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
255 if (max >= count)
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
256 break;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
257 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
16
cin
parents: 15
diff changeset
258 }
cin
parents: 15
diff changeset
259
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
260 #endregion
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
261
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
262 bool StartWorker() {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
263 if (AllocateThreadSlot()) {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
264 // slot successfully allocated
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
265 var worker = new Thread(this.Worker);
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
266 worker.IsBackground = true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
267 worker.Start();
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
268
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
269 return true;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
270 } else {
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
271 return false;
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
272 }
16
cin
parents: 15
diff changeset
273 }
cin
parents: 15
diff changeset
274
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
275 protected abstract void InvokeUnit(TUnit unit);
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
276
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
277 void Worker() {
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
278 TUnit unit;
22
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
279 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
280 Interlocked.Increment(ref m_activeThreads);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
281 do {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
282 // exit if requested
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
283 if (m_exitRequired != 0) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
284 // release the thread slot
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
285 Interlocked.Decrement(ref m_activeThreads);
17
7cd4a843b4e4 Improved worker pool
cin
parents: 16
diff changeset
286 if (ReleaseThreadSlotAnyway()) // it was the last worker
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
287 m_hasTasks.Dispose();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
288 else
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
289 SignalThread(); // wake next worker
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
290 break;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
291 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
292
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
293 // fetch task
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
294 if (TryDequeue(out unit)) {
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
295 InvokeUnit(unit);
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
296 continue;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
297 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
298
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
299 Interlocked.Decrement(ref m_activeThreads);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
300
16
cin
parents: 15
diff changeset
301 // entering suspend state
cin
parents: 15
diff changeset
302 // keep this thread and wait
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
303 if (!Suspend())
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
304 break;
22
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
305 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
20
1c3b3d518480 refactoring, sync
cin
parents: 17
diff changeset
306 Interlocked.Increment(ref m_activeThreads);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
307 } while (true);
22
5a35900264f5 implemented nonblocking wake singnals processing
cin
parents: 21
diff changeset
308 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
309 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
310
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
311 protected virtual void Dispose(bool disposing) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
312 if (disposing) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
313 if (m_exitRequired == 0) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
314 if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
315 return;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
316
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
317 // wake sleeping threads
21
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
318 if (m_createdThreads > 0)
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
319 SignalThread();
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
320 else
6a56df4ec59e DispatchPool works again, but performance is poor in some cases
cin
parents: 20
diff changeset
321 m_hasTasks.Dispose();
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
322 GC.SuppressFinalize(this);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
323 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
324 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
325 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
326
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
327 public void Dispose() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
328 Dispose(true);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
329 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
330
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
331 ~DispatchPool() {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
332 Dispose(false);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
333 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
334 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
335 }