Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
| author | cin |
|---|---|
| date | Thu, 07 Nov 2013 03:41:32 +0400 |
| parents | b0feb5b9ad1c |
| children | 5a4b735ba669 |
comparison
equal
deleted
inserted
replaced
| 14:e943453e5039 | 15:0f982f9b7d4d |
|---|---|
| 4 using System.Text; | 4 using System.Text; |
| 5 using System.Threading; | 5 using System.Threading; |
| 6 using System.Diagnostics; | 6 using System.Diagnostics; |
| 7 | 7 |
| 8 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
| 9 public class WorkerPool : IDisposable { | 9 public class WorkerPool : DispatchPool<Action> { |
| 10 readonly int m_minThreads; | |
| 11 readonly int m_maxThreads; | |
| 12 int m_runningThreads; | |
| 13 object m_lock = new object(); | |
| 14 | 10 |
| 15 bool m_disposed = false; | 11 MTQueue<Action> m_queue = new MTQueue<Action>(); |
| 12 int m_queueLength = 0; | |
| 16 | 13 |
| 17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed | 14 public WorkerPool(int minThreads, int maxThreads) |
| 18 ManualResetEvent m_hasTasks = new ManualResetEvent(false); | 15 : base(minThreads, maxThreads) { |
| 19 Queue<Action> m_queue = new Queue<Action>(); | 16 InitPool(); |
| 20 | |
| 21 public WorkerPool(int min, int max) { | |
| 22 if (min < 0) | |
| 23 throw new ArgumentOutOfRangeException("min"); | |
| 24 if (max <= 0) | |
| 25 throw new ArgumentOutOfRangeException("max"); | |
| 26 | |
| 27 if (min > max) | |
| 28 min = max; | |
| 29 m_minThreads = min; | |
| 30 m_maxThreads = max; | |
| 31 | |
| 32 InitPool(); | |
| 33 } | 17 } |
| 34 | 18 |
| 35 public WorkerPool(int max) | 19 public WorkerPool(int threads) |
| 36 : this(0, max) { | 20 : base(threads) { |
| 21 InitPool(); | |
| 37 } | 22 } |
| 38 | 23 |
| 39 public WorkerPool() { | 24 public WorkerPool() |
| 40 int maxThreads, maxCP; | 25 : base() { |
| 41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | 26 InitPool(); |
| 42 | |
| 43 m_minThreads = 0; | |
| 44 m_maxThreads = maxThreads; | |
| 45 | |
| 46 InitPool(); | |
| 47 } | |
| 48 | |
| 49 void InitPool() { | |
| 50 for (int i = 0; i < m_minThreads; i++) | |
| 51 StartWorker(); | |
| 52 } | |
| 53 | |
| 54 public int ThreadCount { | |
| 55 get { | |
| 56 return m_runningThreads; | |
| 57 } | |
| 58 } | 27 } |
| 59 | 28 |
| 60 public Promise<T> Invoke<T>(Func<T> task) { | 29 public Promise<T> Invoke<T>(Func<T> task) { |
| 61 if (m_disposed) | |
| 62 throw new ObjectDisposedException(ToString()); | |
| 63 if (task == null) | 30 if (task == null) |
| 64 throw new ArgumentNullException("task"); | 31 throw new ArgumentNullException("task"); |
| 32 if (IsDisposed) | |
| 33 throw new ObjectDisposedException(ToString()); | |
| 65 | 34 |
| 66 var promise = new Promise<T>(); | 35 var promise = new Promise<T>(); |
| 67 | 36 |
| 68 var queueLen = EnqueueTask(delegate() { | 37 EnqueueTask(delegate() { |
| 69 try { | 38 try { |
| 70 promise.Resolve(task()); | 39 promise.Resolve(task()); |
| 71 } catch (Exception e) { | 40 } catch (Exception e) { |
| 72 promise.Reject(e); | 41 promise.Reject(e); |
| 73 } | 42 } |
| 74 }); | 43 }); |
| 75 | 44 |
| 76 if (queueLen > 1) | |
| 77 StartWorker(); | |
| 78 | |
| 79 return promise; | 45 return promise; |
| 80 } | 46 } |
| 81 | 47 |
| 82 bool StartWorker() { | 48 protected void EnqueueTask(Action unit) { |
| 83 var current = m_runningThreads; | 49 Debug.Assert(unit != null); |
| 84 // use spins to allocate slot for the new thread | 50 Interlocked.Increment(ref m_queueLength); |
| 85 do { | 51 m_queue.Enqueue(unit); |
| 86 if (current >= m_maxThreads) | 52 // if there are sleeping threads in the pool wake one |
| 87 // no more slots left | 53 // probably this will lead a dry run |
| 88 return false; | 54 WakeNewWorker(); |
| 89 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | |
| 90 | |
| 91 // slot successfully allocated | |
| 92 | |
| 93 var worker = new Thread(this.Worker); | |
| 94 worker.IsBackground = true; | |
| 95 worker.Start(); | |
| 96 | |
| 97 return true; | |
| 98 } | 55 } |
| 99 | 56 |
| 100 int EnqueueTask(Action task) { | 57 protected override bool TryDequeue(out Action unit) { |
| 101 Debug.Assert(task != null); | 58 if (m_queue.TryDequeue(out unit)) { |
| 102 lock (m_queue) { | 59 Interlocked.Decrement(ref m_queueLength); |
| 103 m_queue.Enqueue(task); | 60 return true; |
| 104 m_hasTasks.Set(); | |
| 105 return m_queue.Count; | |
| 106 } | 61 } |
| 62 return false; | |
| 107 } | 63 } |
| 108 | 64 |
| 109 bool FetchTask(out Action task) { | 65 protected override void InvokeUnit(Action unit) { |
| 110 task = null; | 66 unit(); |
| 111 | |
| 112 while (true) { | |
| 113 | |
| 114 m_hasTasks.WaitOne(); | |
| 115 | |
| 116 if (m_disposed) | |
| 117 return false; | |
| 118 | |
| 119 lock (m_queue) { | |
| 120 if (m_queue.Count > 0) { | |
| 121 task = m_queue.Dequeue(); | |
| 122 return true; | |
| 123 } | |
| 124 | |
| 125 // no tasks left | |
| 126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task | |
| 127 m_hasTasks.Reset(); | |
| 128 } | |
| 129 | |
| 130 bool exit = true; | |
| 131 | |
| 132 var current = m_runningThreads; | |
| 133 do { | |
| 134 if (current <= m_minThreads) { | |
| 135 exit = false; // this thread should return and wait for the new events | |
| 136 break; | |
| 137 } | |
| 138 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | |
| 139 | |
| 140 if (exit) | |
| 141 return false; | |
| 142 } | |
| 143 } | |
| 144 | |
| 145 void Worker() { | |
| 146 Action task; | |
| 147 while (FetchTask(out task)) | |
| 148 task(); | |
| 149 } | |
| 150 | |
| 151 protected virtual void Dispose(bool disposing) { | |
| 152 if (disposing) { | |
| 153 lock (m_lock) { | |
| 154 if (m_disposed) | |
| 155 return; | |
| 156 m_disposed = true; | |
| 157 } | |
| 158 m_hasTasks.Set(); | |
| 159 GC.SuppressFinalize(this); | |
| 160 } | |
| 161 } | |
| 162 | |
| 163 public void Dispose() { | |
| 164 Dispose(true); | |
| 165 } | |
| 166 | |
| 167 ~WorkerPool() { | |
| 168 Dispose(false); | |
| 169 } | 67 } |
| 170 } | 68 } |
| 171 } | 69 } |
