comparison Implab/Parallels/AsyncQueue.cs @ 121:62d2f1e98c4e v2

working version of AsyncQueue and batch operations tests
author cin
date Mon, 12 Jan 2015 18:19:41 +0300
parents f1b897999260
children 0c8685c8b56b
comparison
equal deleted inserted replaced
120:f1b897999260 121:62d2f1e98c4e
25 m_alloc = 1; 25 m_alloc = 1;
26 m_data = new T[size]; 26 m_data = new T[size];
27 m_data[0] = value; 27 m_data[0] = value;
28 } 28 }
29 29
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
31 m_size = size;
32 m_hi = length;
33 m_alloc = alloc;
34 m_data = new T[size];
35 Array.Copy(data, offset, m_data, 0, length);
36 }
37
30 public int Low { 38 public int Low {
31 get { return m_low; } 39 get { return m_low; }
32 } 40 }
33 41
34 public int Hi { 42 public int Hi {
35 get { return m_hi; } 43 get { return m_hi; }
36 } 44 }
37 45
38 public bool TryEnqueue(T value,out bool extend) { 46 public bool TryEnqueue(T value, out bool extend) {
39 var alloc = Interlocked.Increment(ref m_alloc) - 1; 47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
40 48
41 if (alloc >= m_size) { 49 if (alloc >= m_size) {
42 extend = alloc == m_size; 50 extend = alloc == m_size;
43 return false; 51 return false;
50 // spin wait for commit 58 // spin wait for commit
51 } 59 }
52 return true; 60 return true;
53 } 61 }
54 62
55 public bool TryDequeue(out T value,out bool recycle) { 63 public bool TryDequeue(out T value, out bool recycle) {
56 int low; 64 int low;
57 do { 65 do {
58 low = m_low; 66 low = m_low;
59 if (low >= m_hi) { 67 if (low >= m_hi) {
60 value = default(T); 68 value = default(T);
71 79
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 int alloc; 81 int alloc;
74 int allocSize; 82 int allocSize;
75 83
84 // in case the batch size is larger than a free space in chunk
85 // tailGap is used to over allocate the space in the chunk to
86 // get exclusive permission on creation of the next one.
87 int tailGap = 0;
88
76 do { 89 do {
77 alloc = m_alloc; 90 alloc = m_alloc;
78 91
79 if (alloc > m_size) { 92 if (alloc > m_size) {
80 enqueued = 0; 93 // the chunk is full and someone already
81 extend = false; 94 // creating the new one
82 return false; 95 enqueued = 0; // nothing was added
83 } 96 extend = false; // the caller shouldn't try to extend the queue
84 97 return false; // nothing was added
85 allocSize = Math.Min(m_size - m_alloc, length); 98 }
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); 99
87 100 allocSize = Math.Min(m_size - alloc, length);
88 if (alloc == m_size) { 101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
89 enqueued = 0; 102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
90 extend = true; 103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
104
105 extend = tailGap != 0;
106 enqueued = allocSize;
107
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
109 if (alloc == m_size)
91 return false; 110 return false;
92 }
93 111
94 Array.Copy(batch, offset, m_data, alloc, allocSize); 112 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97 113
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { 114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit 115 // spin wait for commit
100 } 116 }
101 return true; 117 return true;
102 } 118 }
103 119
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 int low, hi, batchSize;
122
123 do {
124 low = m_low;
125 hi = m_hi;
126 if (low >= hi) {
127 dequeued = 0;
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
129 return false;
130 }
131 batchSize = Math.Min(hi - low, length);
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
133
134 recycle = (low == m_size - batchSize);
135 dequeued = batchSize;
136
137 Array.Copy(m_data, low, buffer, offset, batchSize);
138
139 return true;
140 }
141
104 public T GetAt(int pos) { 142 public T GetAt(int pos) {
105 return m_data[pos]; 143 return m_data[pos];
106 } 144 }
107 } 145 }
108 146
109 public const int DEFAULT_CHUNK_SIZE = 32; 147 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144;
110 149
111 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; 150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
112 151
113 Chunk m_first; 152 Chunk m_first;
114 Chunk m_last; 153 Chunk m_last;
115 154
116 public AsyncQueue() { 155 public AsyncQueue() {
117 m_last = m_first = new Chunk(m_chunkSize); 156 m_last = m_first = new Chunk(m_chunkSize);
118 } 157 }
119 158
159 /// <summary>
160 /// Adds the specified value to the queue.
161 /// </summary>
162 /// <param name="value">Tha value which will be added to the queue.</param>
120 public void Enqueue(T value) { 163 public void Enqueue(T value) {
121 var last = m_last; 164 var last = m_last;
122 // spin wait to the new chunk 165 // spin wait to the new chunk
123 bool extend = true; 166 bool extend = true;
124 while(last == null || !last.TryEnqueue(value, out extend)) { 167 while (last == null || !last.TryEnqueue(value, out extend)) {
125 // try to extend queue 168 // try to extend queue
126 if (extend || last == null) { 169 if (extend || last == null) {
127 var chunk = new Chunk(m_chunkSize, value); 170 var chunk = new Chunk(m_chunkSize, value);
128 if (EnqueueChunk(last, chunk)) 171 if (EnqueueChunk(last, chunk))
129 break; 172 break;
130 last = m_last; 173 last = m_last;
131 } else { 174 } else {
132 while (last != m_last) { 175 while (last == m_last) {
133 Thread.MemoryBarrier(); 176 Thread.MemoryBarrier();
134 last = m_last; 177 }
135 } 178 last = m_last;
136 } 179 }
137 } 180 }
138 } 181 }
139 182
183 /// <summary>
184 /// Adds the specified data to the queue.
185 /// </summary>
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
187 /// <param name="offset">The offset of the data in the buffer.</param>
188 /// <param name="length">The size of the data to read from the buffer.</param>
189 public void EnqueueRange(T[] data, int offset, int length) {
190 if (data == null)
191 throw new ArgumentNullException("data");
192 if (offset < 0)
193 throw new ArgumentOutOfRangeException("offset");
194 if (length < 1 || offset + length > data.Length)
195 throw new ArgumentOutOfRangeException("length");
196
197 var last = m_last;
198
199 bool extend;
200 int enqueued;
201
202 while (length > 0) {
203 extend = true;
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
205 length -= enqueued;
206 offset += enqueued;
207 }
208
209 if (extend) {
210 // there was no enough space in the chunk
211 // or there was no chunks in the queue
212
213 while (length > 0) {
214
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
216
217 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize),
219 data,
220 offset,
221 size,
222 length // length >= size
223 );
224
225 if (!EnqueueChunk(last, chunk)) {
226 // looks like the queue has been updated then proceed from the beginning
227 last = m_last;
228 break;
229 }
230
231 // we have successfully added the new chunk
232 last = chunk;
233 length -= size;
234 offset += size;
235 }
236 } else {
237 // we don't need to extend the queue, if we successfully enqueued data
238 if (length == 0)
239 break;
240
241 // if we need to wait while someone is extending the queue
242 // spinwait
243 while (last == m_last) {
244 Thread.MemoryBarrier();
245 }
246
247 last = m_last;
248 }
249 }
250 }
251
252 /// <summary>
253 /// Tries to retrieve the first element from the queue.
254 /// </summary>
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
256 /// <param name="value">The value of the dequeued element.</param>
140 public bool TryDequeue(out T value) { 257 public bool TryDequeue(out T value) {
141 var chunk = m_first; 258 var chunk = m_first;
142 bool recycle; 259 bool recycle;
143 while (chunk != null) { 260 while (chunk != null) {
144 261
156 chunk = m_first; 273 chunk = m_first;
157 } 274 }
158 275
159 // the queue is empty 276 // the queue is empty
160 value = default(T); 277 value = default(T);
278 return false;
279 }
280
281 /// <summary>
282 /// Tries to dequeue the specified amount of data from the queue.
283 /// </summary>
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
285 /// <param name="buffer">The buffer to which the data will be written.</param>
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
290 if (buffer == null)
291 throw new ArgumentNullException("buffer");
292 if (offset < 0)
293 throw new ArgumentOutOfRangeException("offset");
294 if (length < 1 || offset + length > buffer.Length)
295 throw new ArgumentOutOfRangeException("length");
296
297 var chunk = m_first;
298 bool recycle;
299 dequeued = 0;
300 while (chunk != null) {
301
302 int actual;
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
304 offset += actual;
305 length -= actual;
306 dequeued += actual;
307 }
308
309 if (recycle) // this chunk is waste
310 RecycleFirstChunk(chunk);
311 else if (actual == 0)
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
313
314 if (length == 0)
315 return true;
316
317 // we still may dequeue something
318 // try again
319 chunk = m_first;
320 }
321
322 return dequeued != 0;
323 }
324
325 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null)
335 throw new ArgumentNullException("buffer");
336 if (offset < 0)
337 throw new ArgumentOutOfRangeException("offset");
338 if (length < 1 || offset + length > buffer.Length)
339 throw new ArgumentOutOfRangeException("length");
340
341 var chunk = m_first;
342 bool recycle;
343 dequeued = 0;
344
345 while (chunk != null) {
346
347 int actual;
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
349 dequeued = actual;
350 }
351
352 if (recycle) // this chunk is waste
353 RecycleFirstChunk(chunk);
354
355 // if we have dequeued any data, then return
356 if (dequeued != 0)
357 return true;
358
359 // we still may dequeue something
360 // try again
361 chunk = m_first;
362 }
363
161 return false; 364 return false;
162 } 365 }
163 366
164 bool EnqueueChunk(Chunk last, Chunk chunk) { 367 bool EnqueueChunk(Chunk last, Chunk chunk) {
165 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)