Mercurial > pub > ImplabNet
annotate Implab/Parallels/WorkerPool.cs @ 35:2880242f987a diagnostics
initial log capabilities
| author | cin |
|---|---|
| date | Mon, 14 Apr 2014 18:25:26 +0400 |
| parents | dabf79fde388 |
| children | 313f708a50e9 |
| rev | line source |
|---|---|
| 12 | 1 using System; |
| 2 using System.Collections.Generic; | |
| 3 using System.Linq; | |
| 4 using System.Text; | |
| 5 using System.Threading; | |
| 6 using System.Diagnostics; | |
| 35 | 7 using Implab.Diagnostics; |
| 12 | 8 |
| 9 namespace Implab.Parallels { | |
| 15 | 10 public class WorkerPool : DispatchPool<Action> { |
| 12 | 11 |
| 15 | 12 MTQueue<Action> m_queue = new MTQueue<Action>(); |
| 13 int m_queueLength = 0; | |
| 16 | 14 readonly int m_threshold = 1; |
| 13 | 15 |
| 16 | 16 public WorkerPool(int minThreads, int maxThreads, int threshold) |
| 15 | 17 : base(minThreads, maxThreads) { |
| 16 | 18 m_threshold = threshold; |
| 19 InitPool(); | |
| 20 } | |
| 21 | |
| 22 public WorkerPool(int minThreads, int maxThreads) : | |
| 23 base(minThreads, maxThreads) { | |
| 24 InitPool(); | |
| 13 | 25 } |
| 26 | |
| 15 | 27 public WorkerPool(int threads) |
| 28 : base(threads) { | |
| 16 | 29 InitPool(); |
| 13 | 30 } |
| 31 | |
| 15 | 32 public WorkerPool() |
| 33 : base() { | |
| 16 | 34 InitPool(); |
| 13 | 35 } |
| 36 | |
| 12 | 37 public Promise<T> Invoke<T>(Func<T> task) { |
| 38 if (task == null) | |
| 39 throw new ArgumentNullException("task"); | |
| 15 | 40 if (IsDisposed) |
| 41 throw new ObjectDisposedException(ToString()); | |
| 12 | 42 |
| 43 var promise = new Promise<T>(); | |
| 44 | |
| 35 | 45 var caller = LogContext.Current; |
| 46 | |
| 15 | 47 EnqueueTask(delegate() { |
| 35 | 48 Log.Transfer(caller); |
| 13 | 49 try { |
| 50 promise.Resolve(task()); | |
| 51 } catch (Exception e) { | |
| 52 promise.Reject(e); | |
| 53 } | |
| 54 }); | |
| 12 | 55 |
| 56 return promise; | |
| 57 } | |
| 58 | |
| 15 | 59 protected void EnqueueTask(Action unit) { |
| 60 Debug.Assert(unit != null); | |
| 16 | 61 var len = Interlocked.Increment(ref m_queueLength); |
| 15 | 62 m_queue.Enqueue(unit); |
| 16 | 63 |
|
21
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
64 if (len > m_threshold*ActiveThreads) |
|
6a56df4ec59e
DispatchPool works again, but performance is poor in some cases
cin
parents:
20
diff
changeset
|
65 GrowPool(); |
| 12 | 66 } |
| 67 | |
| 15 | 68 protected override bool TryDequeue(out Action unit) { |
| 69 if (m_queue.TryDequeue(out unit)) { | |
| 70 Interlocked.Decrement(ref m_queueLength); | |
| 71 return true; | |
| 12 | 72 } |
| 15 | 73 return false; |
| 12 | 74 } |
| 75 | |
| 34 | 76 protected override bool Suspend() { |
| 77 // This override solves race condition | |
| 78 // WORKER CLIENT | |
| 79 // --------------------------------------- | |
| 80 // TryDeque == false | |
| 81 // Enqueue(unit), queueLen++ | |
| 82 // GrowPool? == NO | |
| 83 // ActiveThreads-- | |
| 84 // Suspend | |
| 85 // queueLength > 0 | |
| 86 // continue | |
| 87 if (m_queueLength > 0) | |
| 88 return true; | |
| 89 return base.Suspend(); | |
| 90 } | |
| 91 | |
| 15 | 92 protected override void InvokeUnit(Action unit) { |
| 93 unit(); | |
| 12 | 94 } |
| 16 | 95 |
| 12 | 96 } |
| 97 } |
