Mercurial > pub > ImplabNet
annotate Implab/Parallels/WorkerPool.cs @ 80:4f20870d0816 v2
added memory barriers
| author | cin |
|---|---|
| date | Fri, 26 Sep 2014 03:32:34 +0400 |
| parents | fe33f4e02ad5 |
| children | 2c5631b43c7d |
| rev | line source |
|---|---|
| 12 | 1 using System; |
| 2 using System.Collections.Generic; | |
| 3 using System.Linq; | |
| 4 using System.Text; | |
| 5 using System.Threading; | |
| 6 using System.Diagnostics; | |
| 35 | 7 using Implab.Diagnostics; |
| 12 | 8 |
| 9 namespace Implab.Parallels { | |
| 15 | 10 public class WorkerPool : DispatchPool<Action> { |
| 12 | 11 |
| 15 | 12 MTQueue<Action> m_queue = new MTQueue<Action>(); |
| 13 int m_queueLength = 0; | |
| 16 | 14 readonly int m_threshold = 1; |
| 80 | 15 int m_workers = 0; |
| 13 | 16 |
| 16 | 17 public WorkerPool(int minThreads, int maxThreads, int threshold) |
| 15 | 18 : base(minThreads, maxThreads) { |
| 16 | 19 m_threshold = threshold; |
| 80 | 20 m_workers = minThreads; |
| 16 | 21 InitPool(); |
| 22 } | |
| 23 | |
| 24 public WorkerPool(int minThreads, int maxThreads) : | |
| 25 base(minThreads, maxThreads) { | |
| 80 | 26 m_workers = minThreads; |
| 16 | 27 InitPool(); |
| 13 | 28 } |
| 29 | |
| 15 | 30 public WorkerPool(int threads) |
| 31 : base(threads) { | |
| 80 | 32 m_workers = threads; |
| 16 | 33 InitPool(); |
| 13 | 34 } |
| 35 | |
| 15 | 36 public WorkerPool() |
| 37 : base() { | |
| 16 | 38 InitPool(); |
| 13 | 39 } |
| 40 | |
| 12 | 41 public Promise<T> Invoke<T>(Func<T> task) { |
| 42 if (task == null) | |
| 43 throw new ArgumentNullException("task"); | |
| 15 | 44 if (IsDisposed) |
| 45 throw new ObjectDisposedException(ToString()); | |
| 12 | 46 |
| 47 var promise = new Promise<T>(); | |
| 48 | |
| 40 | 49 var caller = TraceContext.Snapshot(); |
| 35 | 50 |
| 15 | 51 EnqueueTask(delegate() { |
| 40 | 52 caller.Invoke(delegate() { |
| 53 try { | |
| 54 promise.Resolve(task()); | |
| 55 } catch (Exception e) { | |
| 56 promise.Reject(e); | |
| 57 } | |
| 58 }); | |
| 13 | 59 }); |
| 12 | 60 |
| 61 return promise; | |
| 62 } | |
| 63 | |
| 15 | 64 protected void EnqueueTask(Action unit) { |
| 65 Debug.Assert(unit != null); | |
| 16 | 66 var len = Interlocked.Increment(ref m_queueLength); |
| 15 | 67 m_queue.Enqueue(unit); |
| 16 | 68 |
| 80 | 69 if (len > m_threshold * m_workers) { |
| 70 Interlocked.Increment(ref m_workers); | |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
71 GrowPool(); |
| 80 | 72 } |
| 12 | 73 } |
| 74 | |
| 15 | 75 protected override bool TryDequeue(out Action unit) { |
| 76 if (m_queue.TryDequeue(out unit)) { | |
| 77 Interlocked.Decrement(ref m_queueLength); | |
| 78 return true; | |
| 12 | 79 } |
| 15 | 80 return false; |
| 12 | 81 } |
| 82 | |
| 34 | 83 protected override bool Suspend() { |
| 84 // This override solves race condition | |
| 85 // WORKER CLIENT | |
| 86 // --------------------------------------- | |
| 87 // TryDeque == false | |
| 88 // Enqueue(unit), queueLen++ | |
| 89 // GrowPool? == NO | |
| 90 // ActiveThreads-- | |
| 91 // Suspend | |
| 92 // queueLength > 0 | |
| 93 // continue | |
| 80 | 94 Thread.MemoryBarrier(); |
| 34 | 95 if (m_queueLength > 0) |
| 96 return true; | |
| 80 | 97 Interlocked.Decrement(ref m_workers); |
| 34 | 98 return base.Suspend(); |
| 99 } | |
| 100 | |
| 15 | 101 protected override void InvokeUnit(Action unit) { |
| 102 unit(); | |
| 12 | 103 } |
| 16 | 104 |
| 12 | 105 } |
| 106 } |
