Mercurial > pub > ImplabNet
view Implab/Parallels/DispatchPool.cs @ 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | 2fc0fbe7d58b |
children | 2c5631b43c7d |
line wrap: on
line source
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Diagnostics; namespace Implab.Parallels { public abstract class DispatchPool<TUnit> : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit int m_createdThreads = 0; // the current size of the pool int m_activeThreads = 0; // the count of threads which are active int m_sleepingThreads = 0; // the count of currently inactive threads int m_maxRunningThreads = 0; // the meximum reached size of the pool int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released int m_wakeEvents = 0; // the count of wake events readonly object m_signalLocker = new object(); protected DispatchPool(int min, int max) { if (min < 0) throw new ArgumentOutOfRangeException("min"); if (max <= 0) throw new ArgumentOutOfRangeException("max"); if (min > max) min = max; m_minThreads = min; m_maxThreads = max; } protected DispatchPool(int threads) : this(threads, threads) { } protected DispatchPool() { int maxThreads, maxCP; ThreadPool.GetMaxThreads(out maxThreads, out maxCP); m_minThreads = 0; m_maxThreads = maxThreads; } protected void InitPool() { for (int i = 0; i < m_minThreads; i++) StartWorker(); } public int PoolSize { get { Thread.MemoryBarrier(); return m_createdThreads; } } public int ActiveThreads { get { Thread.MemoryBarrier(); return m_activeThreads; } } public int MaxRunningThreads { get { Thread.MemoryBarrier(); return m_maxRunningThreads; } } protected bool IsDisposed { get { Thread.MemoryBarrier(); return m_exitRequired == 1; } } protected abstract bool TryDequeue(out TUnit unit); #region thread signaling traits int SignalThread() { var signals = Interlocked.Increment(ref m_wakeEvents); if(signals == 1) lock(m_signalLocker) Monitor.Pulse(m_signalLocker); return signals; } bool FetchSignalOrWait(int timeout) { var start = Environment.TickCount; int signals; Thread.MemoryBarrier(); // m_wakeEvents volatile first read do { signals = m_wakeEvents; if (signals == 0) break; } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); if (signals == 0) { // no signal is fetched lock(m_signalLocker) { while(m_wakeEvents == 0) { if (timeout != -1) timeout = Math.Max(0, timeout - (Environment.TickCount - start)); if(!Monitor.Wait(m_signalLocker,timeout)) return false; // timeout } // m_wakeEvents > 0 if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized Monitor.Pulse(m_signalLocker); // signal fetched return true; } } else { // signal fetched return true; } } bool Sleep(int timeout) { Interlocked.Increment(ref m_sleepingThreads); if (FetchSignalOrWait(timeout)) { Interlocked.Decrement(ref m_sleepingThreads); return true; } else { Interlocked.Decrement(ref m_sleepingThreads); return false; } } #endregion /// <summary> /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока /// </summary> protected void GrowPool() { Thread.MemoryBarrier(); if (m_exitRequired == 1) return; if (m_sleepingThreads > m_wakeEvents) { //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); // all sleeping threads may gone SignalThread(); // wake a sleeping thread; // we can't check whether signal has been processed // anyway it may take some time for the thread to start // we will ensure that at least one thread is running EnsurePoolIsAlive(); } else { // if there is no sleeping threads in the pool if (!StartWorker()) { // we haven't started a new thread, but the current can be on the way to terminate and it can't process the queue // send it a signal to spin again SignalThread(); EnsurePoolIsAlive(); } } } protected void EnsurePoolIsAlive() { if (AllocateThreadSlot(1)) { // if there were no threads in the pool var worker = new Thread(this.Worker); worker.IsBackground = true; worker.Start(); } } protected virtual bool Suspend() { //no tasks left, exit if the thread is no longer needed bool last; bool requestExit; // if threads have a timeout before releasing if (m_releaseTimeout > 0) requestExit = !Sleep(m_releaseTimeout); else requestExit = true; if (!requestExit) return true; // release unsused thread if (requestExit && ReleaseThreadSlot(out last)) { // in case at the moment the last thread was being released // a new task was added to the queue, we need to try // to revoke the thread to avoid the situation when the task is left unprocessed if (last && FetchSignalOrWait(0)) { // FetchSignalOrWait(0) will fetch pending task or will return false SignalThread(); // since FetchSignalOrWait(0) has fetched the signal we need to reschedule it return AllocateThreadSlot(1); // ensure that at least one thread is alive } return false; } // wait till infinity Sleep(-1); return true; } #region thread slots traits bool AllocateThreadSlot() { int current; // use spins to allocate slot for the new thread do { current = m_createdThreads; if (current >= m_maxThreads || m_exitRequired == 1) // no more slots left or the pool has been disposed return false; } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); UpdateMaxThreads(current + 1); return true; } bool AllocateThreadSlot(int desired) { if (desired - 1 != Interlocked.CompareExchange(ref m_createdThreads, desired, desired - 1)) return false; UpdateMaxThreads(desired); return true; } bool ReleaseThreadSlot(out bool last) { last = false; int current; // use spins to release slot for the new thread Thread.MemoryBarrier(); do { current = m_createdThreads; if (current <= m_minThreads && m_exitRequired == 0) // the thread is reserved return false; } while (current != Interlocked.CompareExchange(ref m_createdThreads, current - 1, current)); last = (current == 1); return true; } /// <summary> /// releases thread slot unconditionally, used during cleanup /// </summary> /// <returns>true - no more threads left</returns> bool ReleaseThreadSlotAnyway() { var left = Interlocked.Decrement(ref m_createdThreads); return left == 0; } void UpdateMaxThreads(int count) { int max; do { max = m_maxRunningThreads; if (max >= count) break; } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); } #endregion bool StartWorker() { if (AllocateThreadSlot()) { // slot successfully allocated var worker = new Thread(this.Worker); worker.IsBackground = true; Interlocked.Increment(ref m_activeThreads); worker.Start(); return true; } else { return false; } } protected abstract void InvokeUnit(TUnit unit); protected virtual void Worker() { TUnit unit; //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); int count = 0;; Thread.MemoryBarrier(); do { // exit if requested if (m_exitRequired == 1) { // release the thread slot Interlocked.Decrement(ref m_activeThreads); if (!ReleaseThreadSlotAnyway()) // it was the last worker SignalThread(); // wake next worker break; } // fetch task if (TryDequeue(out unit)) { InvokeUnit(unit); count ++; continue; } Interlocked.Decrement(ref m_activeThreads); Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count); // entering suspend state // keep this thread and wait if (!Suspend()) break; count = 0; //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); } while (true); //Console.WriteLine("{0}: Exited", Thread.CurrentThread.ManagedThreadId); } protected virtual void Dispose(bool disposing) { if (disposing) { if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier // wake sleeping threads if (m_createdThreads > 0) SignalThread(); GC.SuppressFinalize(this); } } } public void Dispose() { Dispose(true); } ~DispatchPool() { Dispose(false); } } }