comparison Implab/Parallels/AsyncQueue.cs @ 234:8dd666e6b6bf v2

Added implab nuget spec
author cin
date Thu, 05 Oct 2017 09:21:23 +0300
parents d6fe09f5592c
children
comparison
equal deleted inserted replaced
233:d6fe09f5592c 234:8dd666e6b6bf
1 using System.Threading; 1 using System.Threading;
2 using System.Collections.Generic; 2 using System.Collections.Generic;
3 using System; 3 using System;
4 using System.Collections; 4 using System.Collections;
5 using System.Diagnostics; 5 using System.Diagnostics;
6 using System.Runtime.CompilerServices;
6 7
7 namespace Implab.Parallels { 8 namespace Implab.Parallels {
8 public class AsyncQueue<T> : IEnumerable<T> { 9 public class AsyncQueue<T> : IEnumerable<T> {
9 class Chunk { 10 class Chunk {
10 public volatile Chunk next; 11 public volatile Chunk next;
47 get { return m_hi; } 48 get { return m_hi; }
48 } 49 }
49 50
50 public int Size { 51 public int Size {
51 get { return m_size; } 52 get { return m_size; }
53 }
54
55 [MethodImpl(MethodImplOptions.AggressiveInlining)]
56 void AwaitWrites(int mark) {
57 if (m_hi != mark) {
58 SpinWait spin = new SpinWait();
59 do {
60 spin.SpinOnce();
61 } while (m_hi != mark);
62 }
52 } 63 }
53 64
54 public bool TryEnqueue(T value) { 65 public bool TryEnqueue(T value) {
55 int alloc; 66 int alloc;
56 do { 67 do {
59 return false; 70 return false;
60 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc)); 71 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
61 72
62 m_data[alloc] = value; 73 m_data[alloc] = value;
63 74
64 SpinWait spin = new SpinWait(); 75 AwaitWrites(alloc);
65 // m_hi is volatile
66 while (alloc != m_hi) {
67 // spin wait for commit
68 spin.SpinOnce();
69 }
70 m_hi = alloc + 1; 76 m_hi = alloc + 1;
71 77
72 return true; 78 return true;
73 } 79 }
74 80
75 /// <summary> 81 /// <summary>
76 /// Prevents from allocating new space in the chunk and waits for all write operations to complete 82 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
77 /// </summary> 83 /// </summary>
78 public void Seal() { 84 public void Seal() {
79 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size); 85 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
80 SpinWait spin = new SpinWait(); 86 AwaitWrites(actual);
81 while (m_hi != actual) {
82 spin.SpinOnce();
83 }
84 } 87 }
85 88
86 public bool TryDequeue(out T value, out bool recycle) { 89 public bool TryDequeue(out T value, out bool recycle) {
87 int low; 90 int low;
88 do { 91 do {
112 } 115 }
113 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc)); 116 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
114 117
115 Array.Copy(batch, offset, m_data, alloc, enqueued); 118 Array.Copy(batch, offset, m_data, alloc, enqueued);
116 119
117 SpinWait spin = new SpinWait(); 120 AwaitWrites(alloc);
118 while (alloc != m_hi) {
119 spin.SpinOnce();
120 }
121
122 m_hi = alloc + enqueued; 121 m_hi = alloc + enqueued;
123 return true; 122 return true;
124 } 123 }
125 124
126 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) { 125 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
359 return; 358 return;
360 } while (true); 359 } while (true);
361 } 360 }
362 361
363 public List<T> Drain() { 362 public List<T> Drain() {
364 // start the new queue 363 Chunk chunk = null;
365 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
366
367 do { 364 do {
368 var first = m_first; 365 var first = m_first;
369 // first.next is volatile 366 // first.next is volatile
370 if (first.next == null) { 367 if (first.next == null) {
371 if (first != m_last) 368 if (first != m_last)
372 continue; 369 continue;
373 else if (first.Hi == first.Low) 370 else if (first.Hi == first.Low)
374 return new List<T>(); 371 return new List<T>();
375 } 372 }
373
374 // start the new queue
375 if (chunk == null)
376 chunk = new Chunk(DEFAULT_CHUNK_SIZE);
376 377
377 // here we will create inconsistency which will force others to spin 378 // here we will create inconsistency which will force others to spin
378 // and prevent from fetching. chunk.next = null 379 // and prevent from fetching. chunk.next = null
379 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) 380 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
380 continue;// inconsistent 381 continue;// inconsistent