comparison Implab/Parallels/AsyncQueue.cs @ 233:d6fe09f5592c v2

Improved AsyncQueue Removed ImplabFx
author cin
date Wed, 04 Oct 2017 15:44:47 +0300
parents 8d5de4eb9c2c
children 8dd666e6b6bf
comparison
equal deleted inserted replaced
229:5f7a3e1d32b9 233:d6fe09f5592c
5 using System.Diagnostics; 5 using System.Diagnostics;
6 6
7 namespace Implab.Parallels { 7 namespace Implab.Parallels {
8 public class AsyncQueue<T> : IEnumerable<T> { 8 public class AsyncQueue<T> : IEnumerable<T> {
9 class Chunk { 9 class Chunk {
10 public Chunk next; 10 public volatile Chunk next;
11 11
12 int m_low; 12 volatile int m_low;
13 int m_hi; 13 volatile int m_hi;
14 int m_alloc; 14 volatile int m_alloc;
15 readonly int m_size; 15 readonly int m_size;
16 readonly T[] m_data; 16 readonly T[] m_data;
17 17
18 public Chunk(int size) { 18 public Chunk(int size) {
19 m_size = size; 19 m_size = size;
26 m_alloc = 1; 26 m_alloc = 1;
27 m_data = new T[size]; 27 m_data = new T[size];
28 m_data[0] = value; 28 m_data[0] = value;
29 } 29 }
30 30
31 public Chunk(int size, T[] data, int offset, int length, int alloc) { 31 public Chunk(int size, int allocated) {
32 m_size = size; 32 m_size = size;
33 m_hi = length; 33 m_hi = allocated;
34 m_alloc = alloc; 34 m_alloc = allocated;
35 m_data = new T[size]; 35 m_data = new T[size];
36 Array.Copy(data, offset, m_data, 0, length); 36 }
37
38 public void WriteData(T[] data, int offset, int dest, int length) {
39 Array.Copy(data, offset, m_data, dest, length);
37 } 40 }
38 41
39 public int Low { 42 public int Low {
40 get { return m_low; } 43 get { return m_low; }
41 } 44 }
46 49
47 public int Size { 50 public int Size {
48 get { return m_size; } 51 get { return m_size; }
49 } 52 }
50 53
51 public bool TryEnqueue(T value, out bool extend) { 54 public bool TryEnqueue(T value) {
52 var alloc = Interlocked.Increment(ref m_alloc) - 1; 55 int alloc;
53 56 do {
54 if (alloc >= m_size) { 57 alloc = m_alloc;
55 extend = alloc == m_size; 58 if (alloc >= m_size)
56 return false; 59 return false;
57 } 60 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
58 61
59 extend = false;
60 m_data[alloc] = value; 62 m_data[alloc] = value;
61 63
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { 64 SpinWait spin = new SpinWait();
65 // m_hi is volatile
66 while (alloc != m_hi) {
63 // spin wait for commit 67 // spin wait for commit
64 } 68 spin.SpinOnce();
69 }
70 m_hi = alloc + 1;
71
65 return true; 72 return true;
66 } 73 }
67 74
68 /// <summary> 75 /// <summary>
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete 76 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
70 /// </summary> 77 /// </summary>
71 public void Commit() { 78 public void Seal() {
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); 79 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
73 80 SpinWait spin = new SpinWait();
74 while (m_hi != actual) 81 while (m_hi != actual) {
75 Thread.MemoryBarrier(); 82 spin.SpinOnce();
83 }
76 } 84 }
77 85
78 public bool TryDequeue(out T value, out bool recycle) { 86 public bool TryDequeue(out T value, out bool recycle) {
79 int low; 87 int low;
80 do { 88 do {
82 if (low >= m_hi) { 90 if (low >= m_hi) {
83 value = default(T); 91 value = default(T);
84 recycle = (low == m_size); 92 recycle = (low == m_size);
85 return false; 93 return false;
86 } 94 }
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); 95 } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
88 96
89 recycle = (low == m_size - 1); 97 recycle = (low + 1 == m_size);
90 value = m_data[low]; 98 value = m_data[low];
91 99
92 return true; 100 return true;
93 } 101 }
94 102
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { 103 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
96 //int alloc; 104 int alloc;
97 //int allocSize; 105 do {
98 106 alloc = m_alloc;
99 var alloc = Interlocked.Add(ref m_alloc, length) - length; 107 if (alloc >= m_size) {
100 if (alloc > m_size) { 108 enqueued = 0;
101 // the chunk is full and someone already 109 return false;
102 // creating the new one 110 } else {
103 enqueued = 0; // nothing was added 111 enqueued = Math.Min(length, m_size - alloc);
104 extend = false; // the caller shouldn't try to extend the queue 112 }
105 return false; // nothing was added 113 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
106 } 114
107
108 enqueued = Math.Min(m_size - alloc, length);
109 extend = length > enqueued;
110
111 if (enqueued == 0)
112 return false;
113
114
115 Array.Copy(batch, offset, m_data, alloc, enqueued); 115 Array.Copy(batch, offset, m_data, alloc, enqueued);
116 116
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { 117 SpinWait spin = new SpinWait();
118 // spin wait for commit 118 while (alloc != m_hi) {
119 } 119 spin.SpinOnce();
120 120 }
121
122 m_hi = alloc + enqueued;
121 return true; 123 return true;
122 } 124 }
123 125
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { 126 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
125 int low, hi, batchSize; 127 int low, hi, batchSize;
126 128
127 do { 129 do {
128 low = m_low; 130 low = m_low;
129 hi = m_hi; 131 hi = m_hi;
130 if (low >= hi) { 132 if (low >= hi) {
131 dequeued = 0; 133 dequeued = 0;
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again 134 recycle = (low == m_size);
133 return false; 135 return false;
134 } 136 }
135 batchSize = Math.Min(hi - low, length); 137 batchSize = Math.Min(hi - low, length);
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); 138 } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
137 139
138 recycle = (low == m_size - batchSize);
139 dequeued = batchSize; 140 dequeued = batchSize;
140 141 recycle = (low + batchSize == m_size);
141 Array.Copy(m_data, low, buffer, offset, batchSize); 142 Array.Copy(m_data, low, buffer, offset, batchSize);
142 143
143 return true; 144 return true;
144 } 145 }
145 146
147 return m_data[pos]; 148 return m_data[pos];
148 } 149 }
149 } 150 }
150 151
151 public const int DEFAULT_CHUNK_SIZE = 32; 152 public const int DEFAULT_CHUNK_SIZE = 32;
152 public const int MAX_CHUNK_SIZE = 262144; 153 public const int MAX_CHUNK_SIZE = 256;
153 154
154 Chunk m_first; 155 Chunk m_first;
155 Chunk m_last; 156 Chunk m_last;
157
158 public AsyncQueue() {
159 m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
160 }
156 161
157 /// <summary> 162 /// <summary>
158 /// Adds the specified value to the queue. 163 /// Adds the specified value to the queue.
159 /// </summary> 164 /// </summary>
160 /// <param name="value">Tha value which will be added to the queue.</param> 165 /// <param name="value">Tha value which will be added to the queue.</param>
161 public virtual void Enqueue(T value) { 166 public void Enqueue(T value) {
162 var last = m_last; 167 var last = m_last;
163 // spin wait to the new chunk 168 SpinWait spin = new SpinWait();
164 bool extend = true; 169 while (!last.TryEnqueue(value)) {
165 while (last == null || !last.TryEnqueue(value, out extend)) {
166 // try to extend queue 170 // try to extend queue
167 if (extend || last == null) { 171 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); 172 var t = Interlocked.CompareExchange(ref m_last, chunk, last);
169 if (EnqueueChunk(last, chunk)) 173 if (t == last) {
170 break; // success! exit! 174 last.next = chunk;
171 last = m_last; 175 break;
172 } else { 176 } else {
173 while (last == m_last) { 177 last = t;
174 Thread.MemoryBarrier(); 178 }
175 } 179 spin.SpinOnce();
176 last = m_last;
177 }
178 } 180 }
179 } 181 }
180 182
181 /// <summary> 183 /// <summary>
182 /// Adds the specified data to the queue. 184 /// Adds the specified data to the queue.
183 /// </summary> 185 /// </summary>
184 /// <param name="data">The buffer which contains the data to be enqueued.</param> 186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
185 /// <param name="offset">The offset of the data in the buffer.</param> 187 /// <param name="offset">The offset of the data in the buffer.</param>
186 /// <param name="length">The size of the data to read from the buffer.</param> 188 /// <param name="length">The size of the data to read from the buffer.</param>
187 public virtual void EnqueueRange(T[] data, int offset, int length) { 189 public void EnqueueRange(T[] data, int offset, int length) {
188 if (data == null) 190 if (data == null)
189 throw new ArgumentNullException("data"); 191 throw new ArgumentNullException("data");
190 if (length == 0)
191 return;
192 if (offset < 0) 192 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset"); 193 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length) 194 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length"); 195 throw new ArgumentOutOfRangeException("length");
196 196
197 var last = m_last;
198
199 bool extend;
200 int enqueued;
201
202 while (length > 0) { 197 while (length > 0) {
203 extend = true; 198 var last = m_last;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { 199 int enqueued;
200
201 if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
205 length -= enqueued; 202 length -= enqueued;
206 offset += enqueued; 203 offset += enqueued;
207 } 204 }
208 205
209 if (extend) { 206 if (length > 0) {
210 // there was no enough space in the chunk 207 // we have something to enqueue
211 // or there was no chunks in the queue 208
212 209 var tail = length % MAX_CHUNK_SIZE;
213 while (length > 0) { 210
214 211 var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
215 var size = Math.Min(length, MAX_CHUNK_SIZE); 212
216 213 if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
217 var chunk = new Chunk( 214 continue; // we wasn't able to catch the writer, roundtrip
218 Math.Max(size, DEFAULT_CHUNK_SIZE), 215
219 data, 216 // we are lucky
220 offset, 217 // we can exclusively write our batch, the other writers will continue their work
221 size, 218
222 length // length >= size 219 length -= tail;
223 ); 220
224 221
225 if (!EnqueueChunk(last, chunk)) { 222 for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
226 // looks like the queue has been updated then proceed from the beginning 223 var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
227 last = m_last; 224 node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
228 break; 225 offset += MAX_CHUNK_SIZE;
229 } 226 // fence last.next is volatile
230 227 last.next = node;
231 // we have successfully added the new chunk 228 last = node;
232 last = chunk;
233 length -= size;
234 offset += size;
235 } 229 }
236 } else { 230
237 // we don't need to extend the queue, if we successfully enqueued data 231 if (tail > 0)
238 if (length == 0) 232 chunk.WriteData(data, offset, 0, tail);
239 break; 233
240 234 // fence last.next is volatile
241 // if we need to wait while someone is extending the queue 235 last.next = chunk;
242 // spinwait 236 return;
243 while (last == m_last) {
244 Thread.MemoryBarrier();
245 }
246
247 last = m_last;
248 } 237 }
249 } 238 }
250 } 239 }
251 240
252 /// <summary> 241 /// <summary>
254 /// </summary> 243 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> 244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param> 245 /// <param name="value">The value of the dequeued element.</param>
257 public bool TryDequeue(out T value) { 246 public bool TryDequeue(out T value) {
258 var chunk = m_first; 247 var chunk = m_first;
259 bool recycle; 248 do {
260 while (chunk != null) { 249 bool recycle;
261 250
262 var result = chunk.TryDequeue(out value, out recycle); 251 var result = chunk.TryDequeue(out value, out recycle);
263 252
264 if (recycle) // this chunk is waste 253 if (recycle && chunk.next != null) {
265 RecycleFirstChunk(chunk); 254 // this chunk is waste
266 else 255 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
256 } else {
267 return result; // this chunk is usable and returned actual result 257 return result; // this chunk is usable and returned actual result
258 }
268 259
269 if (result) // this chunk is waste but the true result is always actual 260 if (result) // this chunk is waste but the true result is always actual
270 return true; 261 return true;
271 262 } while (true);
272 // try again
273 chunk = m_first;
274 }
275
276 // the queue is empty
277 value = default(T);
278 return false;
279 } 263 }
280 264
281 /// <summary> 265 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue. 266 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary> 267 /// </summary>
293 throw new ArgumentOutOfRangeException("offset"); 277 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length) 278 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length"); 279 throw new ArgumentOutOfRangeException("length");
296 280
297 var chunk = m_first; 281 var chunk = m_first;
298 bool recycle;
299 dequeued = 0; 282 dequeued = 0;
300 while (chunk != null) { 283 do {
301 284 bool recycle;
302 int actual; 285 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { 286 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual; 287 offset += actual;
305 length -= actual; 288 length -= actual;
306 dequeued += actual; 289 dequeued += actual;
307 } 290 }
308 291
309 if (recycle) // this chunk is waste 292 if (recycle && chunk.next != null) {
310 RecycleFirstChunk(chunk); 293 // this chunk is waste
311 else if (actual == 0) 294 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) 295 } else {
296 chunk = null;
297 }
313 298
314 if (length == 0) 299 if (length == 0)
315 return true; 300 return true;
316 301 } while (chunk != null);
317 // we still may dequeue something
318 // try again
319 chunk = m_first;
320 }
321 302
322 return dequeued != 0; 303 return dequeued != 0;
323 } 304 }
324 305
325 /// <summary> 306 /// <summary>
337 throw new ArgumentOutOfRangeException("offset"); 318 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length) 319 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length"); 320 throw new ArgumentOutOfRangeException("length");
340 321
341 var chunk = m_first; 322 var chunk = m_first;
342 bool recycle; 323 do {
343 dequeued = 0; 324 bool recycle;
344 325 chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
345 while (chunk != null) { 326
346 327 if (recycle && chunk.next != null) {
347 int actual; 328 // this chunk is waste
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { 329 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
349 dequeued = actual; 330 } else {
350 } 331 chunk = null;
351 332 }
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
354 333
355 // if we have dequeued any data, then return 334 // if we have dequeued any data, then return
356 if (dequeued != 0) 335 if (dequeued != 0)
357 return true; 336 return true;
358 337
359 // we still may dequeue something 338 } while (chunk != null);
360 // try again
361 chunk = m_first;
362 }
363 339
364 return false; 340 return false;
365 } 341 }
366 342
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
369 return false;
370
371 if (last != null)
372 last.next = chunk;
373 else {
374 m_first = chunk;
375 }
376 return true;
377 }
378
379 void RecycleFirstChunk(Chunk first) {
380 var next = first.next;
381
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
383 return;
384
385 if (next == null) {
386
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
388
389 // race
390 // someone already updated the tail, restore the pointer to the queue head
391 m_first = first;
392 }
393 // the tail is updated
394 }
395 }
396 343
397 public void Clear() { 344 public void Clear() {
398 // start the new queue 345 // start the new queue
399 var chunk = new Chunk(DEFAULT_CHUNK_SIZE); 346 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
400
401 do { 347 do {
402 Thread.MemoryBarrier();
403 var first = m_first; 348 var first = m_first;
404 var last = m_last; 349 if (first.next == null && first != m_last) {
405
406 if (last == null) // nothing to clear
407 return;
408
409 if (first == null || (first.next == null && first != last)) // inconcistency
410 continue; 350 continue;
411 351 }
412 // here we will create inconsistency which will force others to spin
413 // and prevent from fetching. chunk.next = null
414 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
415 continue;// inconsistent
416
417 m_last = chunk;
418
419 return;
420
421 } while(true);
422 }
423
424 public T[] Drain() {
425 // start the new queue
426 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
427
428 do {
429 Thread.MemoryBarrier();
430 var first = m_first;
431 var last = m_last;
432
433 if (last == null)
434 return new T[0];
435
436 if (first == null || (first.next == null && first != last))
437 continue;
438 352
439 // here we will create inconsistency which will force others to spin 353 // here we will create inconsistency which will force others to spin
440 // and prevent from fetching. chunk.next = null 354 // and prevent from fetching. chunk.next = null
441 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 355 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
442 continue;// inconsistent 356 continue;// inconsistent
443 357
444 last = Interlocked.Exchange(ref m_last, chunk); 358 m_last = chunk;
359 return;
360 } while (true);
361 }
362
363 public List<T> Drain() {
364 // start the new queue
365 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
366
367 do {
368 var first = m_first;
369 // first.next is volatile
370 if (first.next == null) {
371 if (first != m_last)
372 continue;
373 else if (first.Hi == first.Low)
374 return new List<T>();
375 }
376
377 // here we will create inconsistency which will force others to spin
378 // and prevent from fetching. chunk.next = null
379 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
380 continue;// inconsistent
381
382 var last = Interlocked.Exchange(ref m_last, chunk);
445 383
446 return ReadChunks(first, last); 384 return ReadChunks(first, last);
447 385
448 } while(true); 386 } while (true);
449 } 387 }
450 388
451 static T[] ReadChunks(Chunk chunk, object last) { 389 static List<T> ReadChunks(Chunk chunk, object last) {
452 var result = new List<T>(); 390 var result = new List<T>();
453 var buffer = new T[DEFAULT_CHUNK_SIZE]; 391 var buffer = new T[MAX_CHUNK_SIZE];
454 int actual; 392 int actual;
455 bool recycle; 393 bool recycle;
394 SpinWait spin = new SpinWait();
456 while (chunk != null) { 395 while (chunk != null) {
457 // ensure all write operations on the chunk are complete 396 // ensure all write operations on the chunk are complete
458 chunk.Commit(); 397 chunk.Seal();
459 398
460 // we need to read the chunk using this way 399 // we need to read the chunk using this way
461 // since some client still may completing the dequeue 400 // since some client still may completing the dequeue
462 // operation, such clients most likely won't get results 401 // operation, such clients most likely won't get results
463 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) 402 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
465 404
466 if (chunk == last) { 405 if (chunk == last) {
467 chunk = null; 406 chunk = null;
468 } else { 407 } else {
469 while (chunk.next == null) 408 while (chunk.next == null)
470 Thread.MemoryBarrier(); 409 spin.SpinOnce();
471 chunk = chunk.next; 410 chunk = chunk.next;
472 } 411 }
473 } 412 }
474 413
475 return result.ToArray(); 414 return result;
476 } 415 }
477 416
478 struct ArraySegmentCollection : ICollection<T> { 417 struct ArraySegmentCollection : ICollection<T> {
479 readonly T[] m_data; 418 readonly T[] m_data;
480 readonly int m_offset; 419 readonly int m_offset;
499 public bool Contains(T item) { 438 public bool Contains(T item) {
500 return false; 439 return false;
501 } 440 }
502 441
503 public void CopyTo(T[] array, int arrayIndex) { 442 public void CopyTo(T[] array, int arrayIndex) {
504 Array.Copy(m_data,m_offset,array,arrayIndex, m_length); 443 Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
505 } 444 }
506 445
507 public bool Remove(T item) { 446 public bool Remove(T item) {
508 throw new NotSupportedException(); 447 throw new NotSupportedException();
509 } 448 }