annotate Implab/Parallels/AsyncQueue.cs @ 281:e0916ddc9950 v3 tip

code cleanup and refactoring
author cin
date Fri, 01 Jun 2018 21:35:24 +0300
parents 8dd666e6b6bf
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
1 using System.Threading;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
2 using System.Collections.Generic;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
3 using System;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
4 using System.Collections;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
5 using System.Diagnostics;
234
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
6 using System.Runtime.CompilerServices;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
7
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
8 namespace Implab.Parallels {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
9 public class AsyncQueue<T> : IEnumerable<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
10 class Chunk {
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
11 public volatile Chunk next;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
12
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
13 volatile int m_low;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
14 volatile int m_hi;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
15 volatile int m_alloc;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
16 readonly int m_size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
17 readonly T[] m_data;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
18
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
19 public Chunk(int size) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
20 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
21 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
22 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
23
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
24 public Chunk(int size, T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
25 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
26 m_hi = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
27 m_alloc = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
28 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
29 m_data[0] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
30 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
31
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
32 public Chunk(int size, int allocated) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
33 m_size = size;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
34 m_hi = allocated;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
35 m_alloc = allocated;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
36 m_data = new T[size];
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
37 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
38
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
39 public void WriteData(T[] data, int offset, int dest, int length) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
40 Array.Copy(data, offset, m_data, dest, length);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
41 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
42
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
43 public int Low {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
44 get { return m_low; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
45 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
46
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
47 public int Hi {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
48 get { return m_hi; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
49 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
50
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
51 public int Size {
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
52 get { return m_size; }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
53 }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
54
234
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
55 [MethodImpl(MethodImplOptions.AggressiveInlining)]
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
56 void AwaitWrites(int mark) {
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
57 if (m_hi != mark) {
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
58 SpinWait spin = new SpinWait();
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
59 do {
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
60 spin.SpinOnce();
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
61 } while (m_hi != mark);
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
62 }
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
63 }
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
64
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
65 public bool TryEnqueue(T value) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
66 int alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
67 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
68 alloc = m_alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
69 if (alloc >= m_size)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
70 return false;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
71 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
72
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
73 m_data[alloc] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
74
234
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
75 AwaitWrites(alloc);
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
76 m_hi = alloc + 1;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
77
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
78 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
79 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
80
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
81 /// <summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
82 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
83 /// </summary>
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
84 public void Seal() {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
85 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
234
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
86 AwaitWrites(actual);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
87 }
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
88
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
89 public bool TryDequeue(out T value, out bool recycle) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
90 int low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
91 do {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
92 low = m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
93 if (low >= m_hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
94 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
95 recycle = (low == m_size);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
96 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
97 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
98 } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
99
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
100 recycle = (low + 1 == m_size);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
101 value = m_data[low];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
102
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
103 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
104 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
105
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
106 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
107 int alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
108 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
109 alloc = m_alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
110 if (alloc >= m_size) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
111 enqueued = 0;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
112 return false;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
113 } else {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
114 enqueued = Math.Min(length, m_size - alloc);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
115 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
116 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
117
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
118 Array.Copy(batch, offset, m_data, alloc, enqueued);
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
119
234
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
120 AwaitWrites(alloc);
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
121 m_hi = alloc + enqueued;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
122 return true;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
123 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
124
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
125 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
126 int low, hi, batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
127
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
128 do {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
129 low = m_low;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
130 hi = m_hi;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
131 if (low >= hi) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
132 dequeued = 0;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
133 recycle = (low == m_size);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
134 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
135 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
136 batchSize = Math.Min(hi - low, length);
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
137 } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
138
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
139 dequeued = batchSize;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
140 recycle = (low + batchSize == m_size);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
141 Array.Copy(m_data, low, buffer, offset, batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
142
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
143 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
144 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
145
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
146 public T GetAt(int pos) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
147 return m_data[pos];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
148 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
149 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
150
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
151 public const int DEFAULT_CHUNK_SIZE = 32;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
152 public const int MAX_CHUNK_SIZE = 256;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
153
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
154 Chunk m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
155 Chunk m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
156
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
157 public AsyncQueue() {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
158 m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
159 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
160
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
161 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
162 /// Adds the specified value to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
163 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
164 /// <param name="value">Tha value which will be added to the queue.</param>
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
165 public void Enqueue(T value) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
166 var last = m_last;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
167 SpinWait spin = new SpinWait();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
168 while (!last.TryEnqueue(value)) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
169 // try to extend queue
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
170 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
171 var t = Interlocked.CompareExchange(ref m_last, chunk, last);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
172 if (t == last) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
173 last.next = chunk;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
174 break;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
175 } else {
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
176 last = t;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
177 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
178 spin.SpinOnce();
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
179 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
180 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
181
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
182 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
183 /// Adds the specified data to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
184 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
185 /// <param name="data">The buffer which contains the data to be enqueued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
186 /// <param name="offset">The offset of the data in the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
187 /// <param name="length">The size of the data to read from the buffer.</param>
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
188 public void EnqueueRange(T[] data, int offset, int length) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
189 if (data == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
190 throw new ArgumentNullException("data");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
191 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
192 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
193 if (length < 1 || offset + length > data.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
194 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
195
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
196 while (length > 0) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
197 var last = m_last;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
198 int enqueued;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
199
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
200 if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
201 length -= enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
202 offset += enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
203 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
204
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
205 if (length > 0) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
206 // we have something to enqueue
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
207
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
208 var tail = length % MAX_CHUNK_SIZE;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
209
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
210 var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
211
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
212 if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
213 continue; // we wasn't able to catch the writer, roundtrip
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
214
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
215 // we are lucky
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
216 // we can exclusively write our batch, the other writers will continue their work
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
217
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
218 length -= tail;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
219
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
220
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
221 for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
222 var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
223 node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
224 offset += MAX_CHUNK_SIZE;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
225 // fence last.next is volatile
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
226 last.next = node;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
227 last = node;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
228 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
229
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
230 if (tail > 0)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
231 chunk.WriteData(data, offset, 0, tail);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
232
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
233 // fence last.next is volatile
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
234 last.next = chunk;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
235 return;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
236 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
237 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
238 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
239
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
240 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
241 /// Tries to retrieve the first element from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
242 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
243 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
244 /// <param name="value">The value of the dequeued element.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
245 public bool TryDequeue(out T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
246 var chunk = m_first;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
247 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
248 bool recycle;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
249
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
250 var result = chunk.TryDequeue(out value, out recycle);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
251
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
252 if (recycle && chunk.next != null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
253 // this chunk is waste
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
254 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
255 } else {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
256 return result; // this chunk is usable and returned actual result
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
257 }
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
258
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
259 if (result) // this chunk is waste but the true result is always actual
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
260 return true;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
261 } while (true);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
262 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
263
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
264 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
265 /// Tries to dequeue the specified amount of data from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
266 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
267 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
268 /// <param name="buffer">The buffer to which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
269 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
270 /// <param name="length">The maximum amount of data to be retrieved.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
271 /// <param name="dequeued">The actual amout of the retrieved data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
272 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
273 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
274 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
275 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
276 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
277 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
278 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
279
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
280 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
281 dequeued = 0;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
282 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
283 bool recycle;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
284 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
285 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
286 offset += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
287 length -= actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
288 dequeued += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
289 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
290
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
291 if (recycle && chunk.next != null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
292 // this chunk is waste
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
293 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
294 } else {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
295 chunk = null;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
296 }
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
297
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
298 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
299 return true;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
300 } while (chunk != null);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
301
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
302 return dequeued != 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
303 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
304
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
305 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
306 /// Tries to dequeue all remaining data in the first chunk.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
307 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
308 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
309 /// <param name="buffer">The buffer to which the data will be written.</param>
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
310 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
311 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
312 /// <param name="dequeued">The actual amount of the dequeued data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
313 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
314 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
315 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
316 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
317 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
318 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
319 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
320
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
321 var chunk = m_first;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
322 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
323 bool recycle;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
324 chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
325
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
326 if (recycle && chunk.next != null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
327 // this chunk is waste
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
328 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
329 } else {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
330 chunk = null;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
331 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
332
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
333 // if we have dequeued any data, then return
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
334 if (dequeued != 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
335 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
336
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
337 } while (chunk != null);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
338
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
339 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
340 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
341
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
342
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
343 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
344 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
345 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
346 do {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
347 var first = m_first;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
348 if (first.next == null && first != m_last) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
349 continue;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
350 }
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
351
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
352 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
353 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
354 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
355 continue;// inconsistent
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
356
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
357 m_last = chunk;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
358 return;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
359 } while (true);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
360 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
361
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
362 public List<T> Drain() {
234
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
363 Chunk chunk = null;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
364 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
365 var first = m_first;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
366 // first.next is volatile
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
367 if (first.next == null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
368 if (first != m_last)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
369 continue;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
370 else if (first.Hi == first.Low)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
371 return new List<T>();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
372 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
373
234
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
374 // start the new queue
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
375 if (chunk == null)
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
376 chunk = new Chunk(DEFAULT_CHUNK_SIZE);
8dd666e6b6bf Added implab nuget spec
cin
parents: 233
diff changeset
377
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
378 // here we will create inconsistency which will force others to spin
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
379 // and prevent from fetching. chunk.next = null
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
380 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
381 continue;// inconsistent
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
382
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
383 var last = Interlocked.Exchange(ref m_last, chunk);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
384
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
385 return ReadChunks(first, last);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
386
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
387 } while (true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
388 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
389
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
390 static List<T> ReadChunks(Chunk chunk, object last) {
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
391 var result = new List<T>();
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
392 var buffer = new T[MAX_CHUNK_SIZE];
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
393 int actual;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
394 bool recycle;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
395 SpinWait spin = new SpinWait();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
396 while (chunk != null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
397 // ensure all write operations on the chunk are complete
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
398 chunk.Seal();
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
399
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
400 // we need to read the chunk using this way
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
401 // since some client still may completing the dequeue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
402 // operation, such clients most likely won't get results
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
403 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
404 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
405
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
406 if (chunk == last) {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
407 chunk = null;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
408 } else {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
409 while (chunk.next == null)
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
410 spin.SpinOnce();
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
411 chunk = chunk.next;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
412 }
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
413 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
414
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
415 return result;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
416 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
417
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
418 struct ArraySegmentCollection : ICollection<T> {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
419 readonly T[] m_data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
420 readonly int m_offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
421 readonly int m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
422
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
423 public ArraySegmentCollection(T[] data, int offset, int length) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
424 m_data = data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
425 m_offset = offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
426 m_length = length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
427 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
428
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
429 #region ICollection implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
430
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
431 public void Add(T item) {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
432 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
433 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
434
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
435 public void Clear() {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
436 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
437 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
438
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
439 public bool Contains(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
440 return false;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
441 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
442
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
443 public void CopyTo(T[] array, int arrayIndex) {
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
444 Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
445 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
446
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
447 public bool Remove(T item) {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
448 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
449 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
450
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
451 public int Count {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
452 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
453 return m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
454 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
455 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
456
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
457 public bool IsReadOnly {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
458 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
459 return true;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
460 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
461 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
462
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
463 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
464
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
465 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
466
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
467 public IEnumerator<T> GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
468 for (int i = m_offset; i < m_length + m_offset; i++)
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
469 yield return m_data[i];
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
470 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
471
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
472 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
473
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
474 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
475
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
476 IEnumerator IEnumerable.GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
477 return GetEnumerator();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
478 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
479
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
480 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
481 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
482
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
483 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
484
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
485 class Enumerator : IEnumerator<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
486 Chunk m_current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
487 int m_pos = -1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
488
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
489 public Enumerator(Chunk fisrt) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
490 m_current = fisrt;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
491 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
492
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
493 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
494
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
495 public bool MoveNext() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
496 if (m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
497 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
498
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
499 if (m_pos == -1)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
500 m_pos = m_current.Low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
501 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
502 m_pos++;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
503
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
504 if (m_pos == m_current.Hi) {
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
505
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
506 m_current = m_pos == m_current.Size ? m_current.next : null;
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
507
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
508 m_pos = 0;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
509
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
510 if (m_current == null)
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
511 return false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
512 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
513
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
514 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
515 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
516
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
517 public void Reset() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
518 throw new NotSupportedException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
519 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
520
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
521 object IEnumerator.Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
522 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
523 return Current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
524 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
525 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
526
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
527 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
528
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
529 #region IDisposable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
530
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
531 public void Dispose() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
532 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
533
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
534 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
535
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
536 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
537
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
538 public T Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
539 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
540 if (m_pos == -1 || m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
541 throw new InvalidOperationException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
542 return m_current.GetAt(m_pos);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
543 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
544 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
545
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
546 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
547 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
548
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
549 public IEnumerator<T> GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
550 return new Enumerator(m_first);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
551 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
552
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
553 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
554
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
555 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
556
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
557 IEnumerator IEnumerable.GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
558 return GetEnumerator();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
559 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
560
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
561 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
562 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
563 }