Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 234:8dd666e6b6bf v2
Added implab nuget spec
author | cin |
---|---|
date | Thu, 05 Oct 2017 09:21:23 +0300 |
parents | d6fe09f5592c |
children |
comparison
equal
deleted
inserted
replaced
233:d6fe09f5592c | 234:8dd666e6b6bf |
---|---|
1 using System.Threading; | 1 using System.Threading; |
2 using System.Collections.Generic; | 2 using System.Collections.Generic; |
3 using System; | 3 using System; |
4 using System.Collections; | 4 using System.Collections; |
5 using System.Diagnostics; | 5 using System.Diagnostics; |
6 using System.Runtime.CompilerServices; | |
6 | 7 |
7 namespace Implab.Parallels { | 8 namespace Implab.Parallels { |
8 public class AsyncQueue<T> : IEnumerable<T> { | 9 public class AsyncQueue<T> : IEnumerable<T> { |
9 class Chunk { | 10 class Chunk { |
10 public volatile Chunk next; | 11 public volatile Chunk next; |
47 get { return m_hi; } | 48 get { return m_hi; } |
48 } | 49 } |
49 | 50 |
50 public int Size { | 51 public int Size { |
51 get { return m_size; } | 52 get { return m_size; } |
53 } | |
54 | |
55 [MethodImpl(MethodImplOptions.AggressiveInlining)] | |
56 void AwaitWrites(int mark) { | |
57 if (m_hi != mark) { | |
58 SpinWait spin = new SpinWait(); | |
59 do { | |
60 spin.SpinOnce(); | |
61 } while (m_hi != mark); | |
62 } | |
52 } | 63 } |
53 | 64 |
54 public bool TryEnqueue(T value) { | 65 public bool TryEnqueue(T value) { |
55 int alloc; | 66 int alloc; |
56 do { | 67 do { |
59 return false; | 70 return false; |
60 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc)); | 71 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc)); |
61 | 72 |
62 m_data[alloc] = value; | 73 m_data[alloc] = value; |
63 | 74 |
64 SpinWait spin = new SpinWait(); | 75 AwaitWrites(alloc); |
65 // m_hi is volatile | |
66 while (alloc != m_hi) { | |
67 // spin wait for commit | |
68 spin.SpinOnce(); | |
69 } | |
70 m_hi = alloc + 1; | 76 m_hi = alloc + 1; |
71 | 77 |
72 return true; | 78 return true; |
73 } | 79 } |
74 | 80 |
75 /// <summary> | 81 /// <summary> |
76 /// Prevents from allocating new space in the chunk and waits for all write operations to complete | 82 /// Prevents from allocating new space in the chunk and waits for all write operations to complete |
77 /// </summary> | 83 /// </summary> |
78 public void Seal() { | 84 public void Seal() { |
79 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size); | 85 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size); |
80 SpinWait spin = new SpinWait(); | 86 AwaitWrites(actual); |
81 while (m_hi != actual) { | |
82 spin.SpinOnce(); | |
83 } | |
84 } | 87 } |
85 | 88 |
86 public bool TryDequeue(out T value, out bool recycle) { | 89 public bool TryDequeue(out T value, out bool recycle) { |
87 int low; | 90 int low; |
88 do { | 91 do { |
112 } | 115 } |
113 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc)); | 116 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc)); |
114 | 117 |
115 Array.Copy(batch, offset, m_data, alloc, enqueued); | 118 Array.Copy(batch, offset, m_data, alloc, enqueued); |
116 | 119 |
117 SpinWait spin = new SpinWait(); | 120 AwaitWrites(alloc); |
118 while (alloc != m_hi) { | |
119 spin.SpinOnce(); | |
120 } | |
121 | |
122 m_hi = alloc + enqueued; | 121 m_hi = alloc + enqueued; |
123 return true; | 122 return true; |
124 } | 123 } |
125 | 124 |
126 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) { | 125 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) { |
359 return; | 358 return; |
360 } while (true); | 359 } while (true); |
361 } | 360 } |
362 | 361 |
363 public List<T> Drain() { | 362 public List<T> Drain() { |
364 // start the new queue | 363 Chunk chunk = null; |
365 var chunk = new Chunk(DEFAULT_CHUNK_SIZE); | |
366 | |
367 do { | 364 do { |
368 var first = m_first; | 365 var first = m_first; |
369 // first.next is volatile | 366 // first.next is volatile |
370 if (first.next == null) { | 367 if (first.next == null) { |
371 if (first != m_last) | 368 if (first != m_last) |
372 continue; | 369 continue; |
373 else if (first.Hi == first.Low) | 370 else if (first.Hi == first.Low) |
374 return new List<T>(); | 371 return new List<T>(); |
375 } | 372 } |
373 | |
374 // start the new queue | |
375 if (chunk == null) | |
376 chunk = new Chunk(DEFAULT_CHUNK_SIZE); | |
376 | 377 |
377 // here we will create inconsistency which will force others to spin | 378 // here we will create inconsistency which will force others to spin |
378 // and prevent from fetching. chunk.next = null | 379 // and prevent from fetching. chunk.next = null |
379 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) | 380 if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) |
380 continue;// inconsistent | 381 continue;// inconsistent |