comparison Implab/Parallels/WorkerPool.cs @ 13:b0feb5b9ad1c promises

small fixes, WorkerPool still incomplete
author cin
date Wed, 06 Nov 2013 01:07:55 +0400
parents eb418ba8275b
children 0f982f9b7d4d
comparison
equal deleted inserted replaced
12:eb418ba8275b 13:b0feb5b9ad1c
11 readonly int m_maxThreads; 11 readonly int m_maxThreads;
12 int m_runningThreads; 12 int m_runningThreads;
13 object m_lock = new object(); 13 object m_lock = new object();
14 14
15 bool m_disposed = false; 15 bool m_disposed = false;
16
17 // this event will signal that workers can try to fetch a task from queue or the pool has been disposed
16 ManualResetEvent m_hasTasks = new ManualResetEvent(false); 18 ManualResetEvent m_hasTasks = new ManualResetEvent(false);
17 Queue<Action> m_queue = new Queue<Action>(); 19 Queue<Action> m_queue = new Queue<Action>();
18 20
19 public WorkerPool(int min, int max) { 21 public WorkerPool(int min, int max) {
20 if (min < 0) 22 if (min < 0)
21 throw new ArgumentOutOfRangeException("min"); 23 throw new ArgumentOutOfRangeException("min");
24 if (max <= 0)
25 throw new ArgumentOutOfRangeException("max");
26
22 if (min > max) 27 if (min > max)
23 min = max; 28 min = max;
24 m_minThreads = min; 29 m_minThreads = min;
25 m_maxThreads = max; 30 m_maxThreads = max;
26 31
32 InitPool();
33 }
34
35 public WorkerPool(int max)
36 : this(0, max) {
37 }
38
39 public WorkerPool() {
40 int maxThreads, maxCP;
41 ThreadPool.GetMaxThreads(out maxThreads, out maxCP);
42
43 m_minThreads = 0;
44 m_maxThreads = maxThreads;
45
46 InitPool();
47 }
48
49 void InitPool() {
27 for (int i = 0; i < m_minThreads; i++) 50 for (int i = 0; i < m_minThreads; i++)
28 StartWorker(); 51 StartWorker();
52 }
53
54 public int ThreadCount {
55 get {
56 return m_runningThreads;
57 }
29 } 58 }
30 59
31 public Promise<T> Invoke<T>(Func<T> task) { 60 public Promise<T> Invoke<T>(Func<T> task) {
32 if (m_disposed) 61 if (m_disposed)
33 throw new ObjectDisposedException(ToString()); 62 throw new ObjectDisposedException(ToString());
34 if (task == null) 63 if (task == null)
35 throw new ArgumentNullException("task"); 64 throw new ArgumentNullException("task");
36 65
37 var promise = new Promise<T>(); 66 var promise = new Promise<T>();
38 67
68 var queueLen = EnqueueTask(delegate() {
69 try {
70 promise.Resolve(task());
71 } catch (Exception e) {
72 promise.Reject(e);
73 }
74 });
39 75
76 if (queueLen > 1)
77 StartWorker();
40 78
41 return promise; 79 return promise;
42 } 80 }
43 81
44 bool StartWorker() { 82 bool StartWorker() {
51 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); 89 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
52 90
53 // slot successfully allocated 91 // slot successfully allocated
54 92
55 var worker = new Thread(this.Worker); 93 var worker = new Thread(this.Worker);
94 worker.IsBackground = true;
56 worker.Start(); 95 worker.Start();
57 96
58 return true; 97 return true;
59 } 98 }
60 99
61 void EnqueueTask(Action task) { 100 int EnqueueTask(Action task) {
62 Debug.Assert(task != null); 101 Debug.Assert(task != null);
63 lock (m_queue) { 102 lock (m_queue) {
64 m_queue.Enqueue(task); 103 m_queue.Enqueue(task);
65 m_hasTasks.Set(); 104 m_hasTasks.Set();
105 return m_queue.Count;
66 } 106 }
67 } 107 }
68 108
69 bool FetchTask(out Action task) { 109 bool FetchTask(out Action task) {
70 task = null; 110 task = null;
81 task = m_queue.Dequeue(); 121 task = m_queue.Dequeue();
82 return true; 122 return true;
83 } 123 }
84 124
85 // no tasks left 125 // no tasks left
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task 126 // signal that no more tasks left, current lock ensures that this event won't suppress newly added task
87 m_hasTasks.Reset(); 127 m_hasTasks.Reset();
88 } 128 }
89 129
90 bool exit = true; 130 bool exit = true;
91 131