Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 13:b0feb5b9ad1c promises
small fixes, WorkerPool still incomplete
author | cin |
---|---|
date | Wed, 06 Nov 2013 01:07:55 +0400 |
parents | eb418ba8275b |
children | 0f982f9b7d4d |
comparison
equal
deleted
inserted
replaced
12:eb418ba8275b | 13:b0feb5b9ad1c |
---|---|
11 readonly int m_maxThreads; | 11 readonly int m_maxThreads; |
12 int m_runningThreads; | 12 int m_runningThreads; |
13 object m_lock = new object(); | 13 object m_lock = new object(); |
14 | 14 |
15 bool m_disposed = false; | 15 bool m_disposed = false; |
16 | |
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed | |
16 ManualResetEvent m_hasTasks = new ManualResetEvent(false); | 18 ManualResetEvent m_hasTasks = new ManualResetEvent(false); |
17 Queue<Action> m_queue = new Queue<Action>(); | 19 Queue<Action> m_queue = new Queue<Action>(); |
18 | 20 |
19 public WorkerPool(int min, int max) { | 21 public WorkerPool(int min, int max) { |
20 if (min < 0) | 22 if (min < 0) |
21 throw new ArgumentOutOfRangeException("min"); | 23 throw new ArgumentOutOfRangeException("min"); |
24 if (max <= 0) | |
25 throw new ArgumentOutOfRangeException("max"); | |
26 | |
22 if (min > max) | 27 if (min > max) |
23 min = max; | 28 min = max; |
24 m_minThreads = min; | 29 m_minThreads = min; |
25 m_maxThreads = max; | 30 m_maxThreads = max; |
26 | 31 |
32 InitPool(); | |
33 } | |
34 | |
35 public WorkerPool(int max) | |
36 : this(0, max) { | |
37 } | |
38 | |
39 public WorkerPool() { | |
40 int maxThreads, maxCP; | |
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP); | |
42 | |
43 m_minThreads = 0; | |
44 m_maxThreads = maxThreads; | |
45 | |
46 InitPool(); | |
47 } | |
48 | |
49 void InitPool() { | |
27 for (int i = 0; i < m_minThreads; i++) | 50 for (int i = 0; i < m_minThreads; i++) |
28 StartWorker(); | 51 StartWorker(); |
52 } | |
53 | |
54 public int ThreadCount { | |
55 get { | |
56 return m_runningThreads; | |
57 } | |
29 } | 58 } |
30 | 59 |
31 public Promise<T> Invoke<T>(Func<T> task) { | 60 public Promise<T> Invoke<T>(Func<T> task) { |
32 if (m_disposed) | 61 if (m_disposed) |
33 throw new ObjectDisposedException(ToString()); | 62 throw new ObjectDisposedException(ToString()); |
34 if (task == null) | 63 if (task == null) |
35 throw new ArgumentNullException("task"); | 64 throw new ArgumentNullException("task"); |
36 | 65 |
37 var promise = new Promise<T>(); | 66 var promise = new Promise<T>(); |
38 | 67 |
68 var queueLen = EnqueueTask(delegate() { | |
69 try { | |
70 promise.Resolve(task()); | |
71 } catch (Exception e) { | |
72 promise.Reject(e); | |
73 } | |
74 }); | |
39 | 75 |
76 if (queueLen > 1) | |
77 StartWorker(); | |
40 | 78 |
41 return promise; | 79 return promise; |
42 } | 80 } |
43 | 81 |
44 bool StartWorker() { | 82 bool StartWorker() { |
51 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | 89 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); |
52 | 90 |
53 // slot successfully allocated | 91 // slot successfully allocated |
54 | 92 |
55 var worker = new Thread(this.Worker); | 93 var worker = new Thread(this.Worker); |
94 worker.IsBackground = true; | |
56 worker.Start(); | 95 worker.Start(); |
57 | 96 |
58 return true; | 97 return true; |
59 } | 98 } |
60 | 99 |
61 void EnqueueTask(Action task) { | 100 int EnqueueTask(Action task) { |
62 Debug.Assert(task != null); | 101 Debug.Assert(task != null); |
63 lock (m_queue) { | 102 lock (m_queue) { |
64 m_queue.Enqueue(task); | 103 m_queue.Enqueue(task); |
65 m_hasTasks.Set(); | 104 m_hasTasks.Set(); |
105 return m_queue.Count; | |
66 } | 106 } |
67 } | 107 } |
68 | 108 |
69 bool FetchTask(out Action task) { | 109 bool FetchTask(out Action task) { |
70 task = null; | 110 task = null; |
81 task = m_queue.Dequeue(); | 121 task = m_queue.Dequeue(); |
82 return true; | 122 return true; |
83 } | 123 } |
84 | 124 |
85 // no tasks left | 125 // no tasks left |
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task | 126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task |
87 m_hasTasks.Reset(); | 127 m_hasTasks.Reset(); |
88 } | 128 } |
89 | 129 |
90 bool exit = true; | 130 bool exit = true; |
91 | 131 |