Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 121:62d2f1e98c4e v2
working version of AsyncQueue and batch operations
tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 18:19:41 +0300 |
parents | f1b897999260 |
children | 0c8685c8b56b |
comparison
equal
deleted
inserted
replaced
120:f1b897999260 | 121:62d2f1e98c4e |
---|---|
25 m_alloc = 1; | 25 m_alloc = 1; |
26 m_data = new T[size]; | 26 m_data = new T[size]; |
27 m_data[0] = value; | 27 m_data[0] = value; |
28 } | 28 } |
29 | 29 |
30 public Chunk(int size, T[] data, int offset, int length, int alloc) { | |
31 m_size = size; | |
32 m_hi = length; | |
33 m_alloc = alloc; | |
34 m_data = new T[size]; | |
35 Array.Copy(data, offset, m_data, 0, length); | |
36 } | |
37 | |
30 public int Low { | 38 public int Low { |
31 get { return m_low; } | 39 get { return m_low; } |
32 } | 40 } |
33 | 41 |
34 public int Hi { | 42 public int Hi { |
35 get { return m_hi; } | 43 get { return m_hi; } |
36 } | 44 } |
37 | 45 |
38 public bool TryEnqueue(T value,out bool extend) { | 46 public bool TryEnqueue(T value, out bool extend) { |
39 var alloc = Interlocked.Increment(ref m_alloc) - 1; | 47 var alloc = Interlocked.Increment(ref m_alloc) - 1; |
40 | 48 |
41 if (alloc >= m_size) { | 49 if (alloc >= m_size) { |
42 extend = alloc == m_size; | 50 extend = alloc == m_size; |
43 return false; | 51 return false; |
50 // spin wait for commit | 58 // spin wait for commit |
51 } | 59 } |
52 return true; | 60 return true; |
53 } | 61 } |
54 | 62 |
55 public bool TryDequeue(out T value,out bool recycle) { | 63 public bool TryDequeue(out T value, out bool recycle) { |
56 int low; | 64 int low; |
57 do { | 65 do { |
58 low = m_low; | 66 low = m_low; |
59 if (low >= m_hi) { | 67 if (low >= m_hi) { |
60 value = default(T); | 68 value = default(T); |
71 | 79 |
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { |
73 int alloc; | 81 int alloc; |
74 int allocSize; | 82 int allocSize; |
75 | 83 |
84 // in case the batch size is larger than a free space in chunk | |
85 // tailGap is used to over allocate the space in the chunk to | |
86 // get exclusive permission on creation of the next one. | |
87 int tailGap = 0; | |
88 | |
76 do { | 89 do { |
77 alloc = m_alloc; | 90 alloc = m_alloc; |
78 | 91 |
79 if (alloc > m_size) { | 92 if (alloc > m_size) { |
80 enqueued = 0; | 93 // the chunk is full and someone already |
81 extend = false; | 94 // creating the new one |
82 return false; | 95 enqueued = 0; // nothing was added |
83 } | 96 extend = false; // the caller shouldn't try to extend the queue |
84 | 97 return false; // nothing was added |
85 allocSize = Math.Min(m_size - m_alloc, length); | 98 } |
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); | 99 |
87 | 100 allocSize = Math.Min(m_size - alloc, length); |
88 if (alloc == m_size) { | 101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch |
89 enqueued = 0; | 102 tailGap = 1; // overallocate space to get exclusive permission to extend queue |
90 extend = true; | 103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); |
104 | |
105 extend = tailGap != 0; | |
106 enqueued = allocSize; | |
107 | |
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 | |
109 if (alloc == m_size) | |
91 return false; | 110 return false; |
92 } | |
93 | 111 |
94 Array.Copy(batch, offset, m_data, alloc, allocSize); | 112 Array.Copy(batch, offset, m_data, alloc, allocSize); |
95 enqueued = allocSize; | |
96 extend = false; | |
97 | 113 |
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | 114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { |
99 // spin wait for commit | 115 // spin wait for commit |
100 } | 116 } |
101 return true; | 117 return true; |
102 } | 118 } |
103 | 119 |
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { | |
121 int low, hi, batchSize; | |
122 | |
123 do { | |
124 low = m_low; | |
125 hi = m_hi; | |
126 if (low >= hi) { | |
127 dequeued = 0; | |
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again | |
129 return false; | |
130 } | |
131 batchSize = Math.Min(hi - low, length); | |
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); | |
133 | |
134 recycle = (low == m_size - batchSize); | |
135 dequeued = batchSize; | |
136 | |
137 Array.Copy(m_data, low, buffer, offset, batchSize); | |
138 | |
139 return true; | |
140 } | |
141 | |
104 public T GetAt(int pos) { | 142 public T GetAt(int pos) { |
105 return m_data[pos]; | 143 return m_data[pos]; |
106 } | 144 } |
107 } | 145 } |
108 | 146 |
109 public const int DEFAULT_CHUNK_SIZE = 32; | 147 public const int DEFAULT_CHUNK_SIZE = 32; |
148 public const int MAX_CHUNK_SIZE = 262144; | |
110 | 149 |
111 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; | 150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; |
112 | 151 |
113 Chunk m_first; | 152 Chunk m_first; |
114 Chunk m_last; | 153 Chunk m_last; |
115 | 154 |
116 public AsyncQueue() { | 155 public AsyncQueue() { |
117 m_last = m_first = new Chunk(m_chunkSize); | 156 m_last = m_first = new Chunk(m_chunkSize); |
118 } | 157 } |
119 | 158 |
159 /// <summary> | |
160 /// Adds the specified value to the queue. | |
161 /// </summary> | |
162 /// <param name="value">Tha value which will be added to the queue.</param> | |
120 public void Enqueue(T value) { | 163 public void Enqueue(T value) { |
121 var last = m_last; | 164 var last = m_last; |
122 // spin wait to the new chunk | 165 // spin wait to the new chunk |
123 bool extend = true; | 166 bool extend = true; |
124 while(last == null || !last.TryEnqueue(value, out extend)) { | 167 while (last == null || !last.TryEnqueue(value, out extend)) { |
125 // try to extend queue | 168 // try to extend queue |
126 if (extend || last == null) { | 169 if (extend || last == null) { |
127 var chunk = new Chunk(m_chunkSize, value); | 170 var chunk = new Chunk(m_chunkSize, value); |
128 if (EnqueueChunk(last, chunk)) | 171 if (EnqueueChunk(last, chunk)) |
129 break; | 172 break; |
130 last = m_last; | 173 last = m_last; |
131 } else { | 174 } else { |
132 while (last != m_last) { | 175 while (last == m_last) { |
133 Thread.MemoryBarrier(); | 176 Thread.MemoryBarrier(); |
134 last = m_last; | 177 } |
135 } | 178 last = m_last; |
136 } | 179 } |
137 } | 180 } |
138 } | 181 } |
139 | 182 |
183 /// <summary> | |
184 /// Adds the specified data to the queue. | |
185 /// </summary> | |
186 /// <param name="data">The buffer which contains the data to be enqueued.</param> | |
187 /// <param name="offset">The offset of the data in the buffer.</param> | |
188 /// <param name="length">The size of the data to read from the buffer.</param> | |
189 public void EnqueueRange(T[] data, int offset, int length) { | |
190 if (data == null) | |
191 throw new ArgumentNullException("data"); | |
192 if (offset < 0) | |
193 throw new ArgumentOutOfRangeException("offset"); | |
194 if (length < 1 || offset + length > data.Length) | |
195 throw new ArgumentOutOfRangeException("length"); | |
196 | |
197 var last = m_last; | |
198 | |
199 bool extend; | |
200 int enqueued; | |
201 | |
202 while (length > 0) { | |
203 extend = true; | |
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { | |
205 length -= enqueued; | |
206 offset += enqueued; | |
207 } | |
208 | |
209 if (extend) { | |
210 // there was no enough space in the chunk | |
211 // or there was no chunks in the queue | |
212 | |
213 while (length > 0) { | |
214 | |
215 var size = Math.Min(length, MAX_CHUNK_SIZE); | |
216 | |
217 var chunk = new Chunk( | |
218 Math.Max(size, m_chunkSize), | |
219 data, | |
220 offset, | |
221 size, | |
222 length // length >= size | |
223 ); | |
224 | |
225 if (!EnqueueChunk(last, chunk)) { | |
226 // looks like the queue has been updated then proceed from the beginning | |
227 last = m_last; | |
228 break; | |
229 } | |
230 | |
231 // we have successfully added the new chunk | |
232 last = chunk; | |
233 length -= size; | |
234 offset += size; | |
235 } | |
236 } else { | |
237 // we don't need to extend the queue, if we successfully enqueued data | |
238 if (length == 0) | |
239 break; | |
240 | |
241 // if we need to wait while someone is extending the queue | |
242 // spinwait | |
243 while (last == m_last) { | |
244 Thread.MemoryBarrier(); | |
245 } | |
246 | |
247 last = m_last; | |
248 } | |
249 } | |
250 } | |
251 | |
252 /// <summary> | |
253 /// Tries to retrieve the first element from the queue. | |
254 /// </summary> | |
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> | |
256 /// <param name="value">The value of the dequeued element.</param> | |
140 public bool TryDequeue(out T value) { | 257 public bool TryDequeue(out T value) { |
141 var chunk = m_first; | 258 var chunk = m_first; |
142 bool recycle; | 259 bool recycle; |
143 while (chunk != null) { | 260 while (chunk != null) { |
144 | 261 |
156 chunk = m_first; | 273 chunk = m_first; |
157 } | 274 } |
158 | 275 |
159 // the queue is empty | 276 // the queue is empty |
160 value = default(T); | 277 value = default(T); |
278 return false; | |
279 } | |
280 | |
281 /// <summary> | |
282 /// Tries to dequeue the specified amount of data from the queue. | |
283 /// </summary> | |
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns> | |
285 /// <param name="buffer">The buffer to which the data will be written.</param> | |
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param> | |
287 /// <param name="length">The maximum amount of data to be retrieved.</param> | |
288 /// <param name="dequeued">The actual amout of the retrieved data.</param> | |
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { | |
290 if (buffer == null) | |
291 throw new ArgumentNullException("buffer"); | |
292 if (offset < 0) | |
293 throw new ArgumentOutOfRangeException("offset"); | |
294 if (length < 1 || offset + length > buffer.Length) | |
295 throw new ArgumentOutOfRangeException("length"); | |
296 | |
297 var chunk = m_first; | |
298 bool recycle; | |
299 dequeued = 0; | |
300 while (chunk != null) { | |
301 | |
302 int actual; | |
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { | |
304 offset += actual; | |
305 length -= actual; | |
306 dequeued += actual; | |
307 } | |
308 | |
309 if (recycle) // this chunk is waste | |
310 RecycleFirstChunk(chunk); | |
311 else if (actual == 0) | |
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) | |
313 | |
314 if (length == 0) | |
315 return true; | |
316 | |
317 // we still may dequeue something | |
318 // try again | |
319 chunk = m_first; | |
320 } | |
321 | |
322 return dequeued != 0; | |
323 } | |
324 | |
325 /// <summary> | |
326 /// Tries to dequeue all remaining data in the first chunk. | |
327 /// </summary> | |
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | |
329 /// <param name="buffer">The buffer to which data will be written.</param> | |
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param> | |
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | |
332 /// <param name="dequeued">The actual amount of the dequeued data.</param> | |
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { | |
334 if (buffer == null) | |
335 throw new ArgumentNullException("buffer"); | |
336 if (offset < 0) | |
337 throw new ArgumentOutOfRangeException("offset"); | |
338 if (length < 1 || offset + length > buffer.Length) | |
339 throw new ArgumentOutOfRangeException("length"); | |
340 | |
341 var chunk = m_first; | |
342 bool recycle; | |
343 dequeued = 0; | |
344 | |
345 while (chunk != null) { | |
346 | |
347 int actual; | |
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { | |
349 dequeued = actual; | |
350 } | |
351 | |
352 if (recycle) // this chunk is waste | |
353 RecycleFirstChunk(chunk); | |
354 | |
355 // if we have dequeued any data, then return | |
356 if (dequeued != 0) | |
357 return true; | |
358 | |
359 // we still may dequeue something | |
360 // try again | |
361 chunk = m_first; | |
362 } | |
363 | |
161 return false; | 364 return false; |
162 } | 365 } |
163 | 366 |
164 bool EnqueueChunk(Chunk last, Chunk chunk) { | 367 bool EnqueueChunk(Chunk last, Chunk chunk) { |
165 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | 368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) |