Mercurial > pub > ImplabNet
comparison Implab/Parallels/WorkerPool.cs @ 12:eb418ba8275b promises
refactoring, added WorkerPool
author | cin |
---|---|
date | Tue, 05 Nov 2013 19:55:34 +0400 |
parents | |
children | b0feb5b9ad1c |
comparison
equal
deleted
inserted
replaced
11:6ec82bf68c8e | 12:eb418ba8275b |
---|---|
1 using System; | |
2 using System.Collections.Generic; | |
3 using System.Linq; | |
4 using System.Text; | |
5 using System.Threading; | |
6 using System.Diagnostics; | |
7 | |
8 namespace Implab.Parallels { | |
9 public class WorkerPool : IDisposable { | |
10 readonly int m_minThreads; | |
11 readonly int m_maxThreads; | |
12 int m_runningThreads; | |
13 object m_lock = new object(); | |
14 | |
15 bool m_disposed = false; | |
16 ManualResetEvent m_hasTasks = new ManualResetEvent(false); | |
17 Queue<Action> m_queue = new Queue<Action>(); | |
18 | |
19 public WorkerPool(int min, int max) { | |
20 if (min < 0) | |
21 throw new ArgumentOutOfRangeException("min"); | |
22 if (min > max) | |
23 min = max; | |
24 m_minThreads = min; | |
25 m_maxThreads = max; | |
26 | |
27 for (int i = 0; i < m_minThreads; i++) | |
28 StartWorker(); | |
29 } | |
30 | |
31 public Promise<T> Invoke<T>(Func<T> task) { | |
32 if (m_disposed) | |
33 throw new ObjectDisposedException(ToString()); | |
34 if (task == null) | |
35 throw new ArgumentNullException("task"); | |
36 | |
37 var promise = new Promise<T>(); | |
38 | |
39 | |
40 | |
41 return promise; | |
42 } | |
43 | |
44 bool StartWorker() { | |
45 var current = m_runningThreads; | |
46 // use spins to allocate slot for the new thread | |
47 do { | |
48 if (current >= m_maxThreads) | |
49 // no more slots left | |
50 return false; | |
51 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); | |
52 | |
53 // slot successfully allocated | |
54 | |
55 var worker = new Thread(this.Worker); | |
56 worker.Start(); | |
57 | |
58 return true; | |
59 } | |
60 | |
61 void EnqueueTask(Action task) { | |
62 Debug.Assert(task != null); | |
63 lock (m_queue) { | |
64 m_queue.Enqueue(task); | |
65 m_hasTasks.Set(); | |
66 } | |
67 } | |
68 | |
69 bool FetchTask(out Action task) { | |
70 task = null; | |
71 | |
72 while (true) { | |
73 | |
74 m_hasTasks.WaitOne(); | |
75 | |
76 if (m_disposed) | |
77 return false; | |
78 | |
79 lock (m_queue) { | |
80 if (m_queue.Count > 0) { | |
81 task = m_queue.Dequeue(); | |
82 return true; | |
83 } | |
84 | |
85 // no tasks left | |
86 // signal that no more tasks left, lock ensures that this event won't suppress newly added task | |
87 m_hasTasks.Reset(); | |
88 } | |
89 | |
90 bool exit = true; | |
91 | |
92 var current = m_runningThreads; | |
93 do { | |
94 if (current <= m_minThreads) { | |
95 exit = false; // this thread should return and wait for the new events | |
96 break; | |
97 } | |
98 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); | |
99 | |
100 if (exit) | |
101 return false; | |
102 } | |
103 } | |
104 | |
105 void Worker() { | |
106 Action task; | |
107 while (FetchTask(out task)) | |
108 task(); | |
109 } | |
110 | |
111 protected virtual void Dispose(bool disposing) { | |
112 if (disposing) { | |
113 lock (m_lock) { | |
114 if (m_disposed) | |
115 return; | |
116 m_disposed = true; | |
117 } | |
118 m_hasTasks.Set(); | |
119 GC.SuppressFinalize(this); | |
120 } | |
121 } | |
122 | |
123 public void Dispose() { | |
124 Dispose(true); | |
125 } | |
126 | |
127 ~WorkerPool() { | |
128 Dispose(false); | |
129 } | |
130 } | |
131 } |