comparison Implab/Parallels/DispatchPool.cs @ 22:5a35900264f5 promises

implemented nonblocking wake singnals processing
author cin
date Wed, 13 Nov 2013 14:03:00 +0400
parents 6a56df4ec59e
children ee04e1fa78da
comparison
equal deleted inserted replaced
21:6a56df4ec59e 22:5a35900264f5
81 if(signals == 1) 81 if(signals == 1)
82 m_hasTasks.Set(); 82 m_hasTasks.Set();
83 return signals; 83 return signals;
84 } 84 }
85 85
86 bool FetchSignalOrWait(int timeout) {
87 var start = Environment.TickCount;
88
89 // означает, что поток владеет блокировкой и при успешном получении сигнала должен
90 // ее вернуть, чтобы другой ожидающий поток смог
91 bool hasLock = false;
92 do {
93 int signals;
94 do {
95 signals = m_wakeEvents;
96 if (signals == 0)
97 break;
98 } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
99
100 if (signals >= 1) {
101 if (signals > 1 && hasLock)
102 m_hasTasks.Set();
103 return true;
104 }
105
106 if (timeout != -1)
107 timeout = Math.Max(0, timeout - (Environment.TickCount - start));
108
109 // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
110 // и уйдет на пустой цикл, после чего заблокируется
111
112 hasLock = true;
113 } while (m_hasTasks.WaitOne(timeout));
114
115 return false;
116 }
117
86 bool Sleep(int timeout) { 118 bool Sleep(int timeout) {
87 Interlocked.Increment(ref m_sleepingThreads); 119 Interlocked.Increment(ref m_sleepingThreads);
88 if (m_hasTasks.WaitOne(timeout)) { 120 if (FetchSignalOrWait(timeout)) {
89 // this is autoreset event, only one thread can run this block simultaneously 121 Interlocked.Decrement(ref m_sleepingThreads);
90 var sleeping = Interlocked.Decrement(ref m_sleepingThreads);
91 if (Interlocked.Decrement(ref m_wakeEvents) > 0)
92 m_hasTasks.Set(); // wake next worker
93
94 return true; 122 return true;
95 } else { 123 } else {
96 Interlocked.Decrement(ref m_sleepingThreads); 124 Interlocked.Decrement(ref m_sleepingThreads);
97 return false; 125 return false;
98 } 126 }
104 /// </summary> 132 /// </summary>
105 protected void GrowPool() { 133 protected void GrowPool() {
106 if (m_exitRequired != 0) 134 if (m_exitRequired != 0)
107 return; 135 return;
108 if (m_sleepingThreads > m_wakeEvents) { 136 if (m_sleepingThreads > m_wakeEvents) {
137 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
138
109 // all sleeping threads may gone 139 // all sleeping threads may gone
110 SignalThread(); // wake a sleeping thread; 140 SignalThread(); // wake a sleeping thread;
111 141
112 // we can't check whether signal has been processed 142 // we can't check whether signal has been processed
113 // anyway it may take some time for the thread to start 143 // anyway it may take some time for the thread to start
127 157
128 private bool Suspend() { 158 private bool Suspend() {
129 //no tasks left, exit if the thread is no longer needed 159 //no tasks left, exit if the thread is no longer needed
130 bool last; 160 bool last;
131 bool requestExit; 161 bool requestExit;
132
133
134 162
135 // if threads have a timeout before releasing 163 // if threads have a timeout before releasing
136 if (m_releaseTimeout > 0) 164 if (m_releaseTimeout > 0)
137 requestExit = !Sleep(m_releaseTimeout); 165 requestExit = !Sleep(m_releaseTimeout);
138 else 166 else
240 268
241 protected abstract void InvokeUnit(TUnit unit); 269 protected abstract void InvokeUnit(TUnit unit);
242 270
243 void Worker() { 271 void Worker() {
244 TUnit unit; 272 TUnit unit;
273 //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
245 Interlocked.Increment(ref m_activeThreads); 274 Interlocked.Increment(ref m_activeThreads);
246 Sleep(0); // remove wake request if the new thread is started
247 do { 275 do {
248 // exit if requested 276 // exit if requested
249 if (m_exitRequired != 0) { 277 if (m_exitRequired != 0) {
250 // release the thread slot 278 // release the thread slot
251 Interlocked.Decrement(ref m_activeThreads); 279 Interlocked.Decrement(ref m_activeThreads);
267 295
268 // entering suspend state 296 // entering suspend state
269 // keep this thread and wait 297 // keep this thread and wait
270 if (!Suspend()) 298 if (!Suspend())
271 break; 299 break;
272 300 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
273 Interlocked.Increment(ref m_activeThreads); 301 Interlocked.Increment(ref m_activeThreads);
274 } while (true); 302 } while (true);
275 303 //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId);
276 } 304 }
277 305
278 protected virtual void Dispose(bool disposing) { 306 protected virtual void Dispose(bool disposing) {
279 if (disposing) { 307 if (disposing) {
280 if (m_exitRequired == 0) { 308 if (m_exitRequired == 0) {