comparison Implab/Parallels/DispatchPool.cs @ 17:7cd4a843b4e4 promises

Improved worker pool
author cin
date Fri, 08 Nov 2013 01:25:42 +0400
parents 5a4b735ba669
children 1c3b3d518480
comparison
equal deleted inserted replaced
16:5a4b735ba669 17:7cd4a843b4e4
60 get { 60 get {
61 return m_exitRequired != 0; 61 return m_exitRequired != 0;
62 } 62 }
63 } 63 }
64 64
65 bool StartWorker() { 65 protected abstract bool TryDequeue(out TUnit unit);
66
67 protected virtual bool ExtendPool() {
68 if (m_suspended > 0) {
69 m_hasTasks.Set();
70 return true;
71 } else
72 return StartWorker();
73 }
74
75 /// <summary>
76 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
77 /// </summary>
78 protected void WakePool() {
79 m_hasTasks.Set(); // wake sleeping thread;
80
81 if (AllocateThreadSlot(1)) {
82 var worker = new Thread(this.Worker);
83 worker.IsBackground = true;
84 worker.Start();
85 }
86 }
87
88 protected virtual void Suspend() {
89 m_hasTasks.WaitOne();
90 }
91
92 #region thread slots traits
93
94 bool AllocateThreadSlot() {
66 int current; 95 int current;
67 // use spins to allocate slot for the new thread 96 // use spins to allocate slot for the new thread
68 do { 97 do {
69 current = m_runningThreads; 98 current = m_runningThreads;
70 if (current >= m_maxThreads || m_exitRequired != 0) 99 if (current >= m_maxThreads || m_exitRequired != 0)
71 // no more slots left or the pool has been disposed 100 // no more slots left or the pool has been disposed
72 return false; 101 return false;
73 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); 102 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
74 103
75 m_maxRunningThreads = Math.Max(m_maxRunningThreads, current + 1); 104 UpdateMaxThreads(current + 1);
76
77 // slot successfully allocated
78
79 var worker = new Thread(this.Worker);
80 worker.IsBackground = true;
81 worker.Start();
82 105
83 return true; 106 return true;
84 } 107 }
85 108
86 protected abstract bool TryDequeue(out TUnit unit); 109 bool AllocateThreadSlot(int desired) {
87 110 if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1))
88 protected virtual void WakeNewWorker(bool extend) { 111 return false;
89 if (m_suspended > 0) 112
90 m_hasTasks.Set(); 113 UpdateMaxThreads(desired);
91 else 114
92 StartWorker(); 115 return true;
116 }
117
118 bool ReleaseThreadSlot(out bool last) {
119 last = false;
120 int current;
121 // use spins to release slot for the new thread
122 do {
123 current = m_runningThreads;
124 if (current <= m_minThreads && m_exitRequired == 0)
125 // the thread is reserved
126 return false;
127 } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
128
129 last = (current == 1);
130
131 return true;
93 } 132 }
94 133
95 /// <summary> 134 /// <summary>
96 /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока 135 /// releases thread slot unconditionally, used during cleanup
97 /// </summary> 136 /// </summary>
98 protected void StartIfIdle() { 137 /// <returns>true - no more threads left</returns>
99 int threads; 138 bool ReleaseThreadSlotAnyway() {
100 do { 139 var left = Interlocked.Decrement(ref m_runningThreads);
101 140 return left == 0;
102 } 141 }
103 } 142
104 143 void UpdateMaxThreads(int count) {
105 protected virtual void Suspend() { 144 int max;
106 m_hasTasks.WaitOne(); 145 do {
146 max = m_maxRunningThreads;
147 if (max >= count)
148 break;
149 } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max));
150 }
151
152 #endregion
153
154 bool StartWorker() {
155 if (AllocateThreadSlot()) {
156 // slot successfully allocated
157 var worker = new Thread(this.Worker);
158 worker.IsBackground = true;
159 worker.Start();
160
161 return true;
162 } else {
163 return false;
164 }
107 } 165 }
108 166
109 bool FetchTask(out TUnit unit) { 167 bool FetchTask(out TUnit unit) {
110 do { 168 do {
111 // exit if requested 169 // exit if requested
112 if (m_exitRequired != 0) { 170 if (m_exitRequired != 0) {
113 // release the thread slot 171 // release the thread slot
114 var running = Interlocked.Decrement(ref m_runningThreads); 172 if (ReleaseThreadSlotAnyway()) // it was the last worker
115 if (running == 0) // it was the last worker
116 m_hasTasks.Dispose(); 173 m_hasTasks.Dispose();
117 else 174 else
118 m_hasTasks.Set(); // release next worker 175 m_hasTasks.Set(); // wake next worker
119 unit = default(TUnit); 176 unit = default(TUnit);
120 return false; 177 return false;
121 } 178 }
122 179
123 // fetch task 180 // fetch task
124 if (TryDequeue(out unit)) { 181 if (TryDequeue(out unit)) {
125 WakeNewWorker(true); 182 ExtendPool();
126 return true; 183 return true;
127 } 184 }
128 185
129 //no tasks left, exit if the thread is no longer needed 186 //no tasks left, exit if the thread is no longer needed
130 int runningThreads; 187 bool last;
131 bool exit = true; 188 if (ReleaseThreadSlot(out last)) {
132 do { 189 if (last && m_hasTasks.WaitOne(0)) {
133 runningThreads = m_runningThreads; 190 if (AllocateThreadSlot(1))
134 if (runningThreads <= m_minThreads) { 191 continue; // spin again...
135 // check wheather this is the last thread and we have tasks 192 else
136 193 // we failed to reallocate slot for this thread
137 exit = false; 194 // therefore we need to release the event
138 break; 195 m_hasTasks.Set();
139 } 196 }
140 } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); 197
141
142 if (exit) {
143 return false; 198 return false;
144 } 199 }
145 200
146 // entering suspend state 201 // entering suspend state
147 Interlocked.Increment(ref m_suspended); 202 Interlocked.Increment(ref m_suspended);