Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 125:f803565868a4 v2
improved performance of promises
author | cin |
---|---|
date | Thu, 15 Jan 2015 12:09:20 +0300 |
parents | a336cb13c6a9 |
children | d86da8d2d4c3 |
comparison
equal
deleted
inserted
replaced
124:a336cb13c6a9 | 125:f803565868a4 |
---|---|
145 } | 145 } |
146 | 146 |
147 public const int DEFAULT_CHUNK_SIZE = 32; | 147 public const int DEFAULT_CHUNK_SIZE = 32; |
148 public const int MAX_CHUNK_SIZE = 262144; | 148 public const int MAX_CHUNK_SIZE = 262144; |
149 | 149 |
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; | |
151 | |
152 Chunk m_first; | 150 Chunk m_first; |
153 Chunk m_last; | 151 Chunk m_last; |
154 | |
155 public AsyncQueue() { | |
156 m_last = m_first = new Chunk(m_chunkSize); | |
157 } | |
158 | 152 |
159 /// <summary> | 153 /// <summary> |
160 /// Adds the specified value to the queue. | 154 /// Adds the specified value to the queue. |
161 /// </summary> | 155 /// </summary> |
162 /// <param name="value">Tha value which will be added to the queue.</param> | 156 /// <param name="value">Tha value which will be added to the queue.</param> |
165 // spin wait to the new chunk | 159 // spin wait to the new chunk |
166 bool extend = true; | 160 bool extend = true; |
167 while (last == null || !last.TryEnqueue(value, out extend)) { | 161 while (last == null || !last.TryEnqueue(value, out extend)) { |
168 // try to extend queue | 162 // try to extend queue |
169 if (extend || last == null) { | 163 if (extend || last == null) { |
170 var chunk = new Chunk(m_chunkSize, value); | 164 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); |
171 if (EnqueueChunk(last, chunk)) | 165 if (EnqueueChunk(last, chunk)) |
172 break; // success! exit! | 166 break; // success! exit! |
173 last = m_last; | 167 last = m_last; |
174 } else { | 168 } else { |
175 while (last == m_last) { | 169 while (last == m_last) { |
213 while (length > 0) { | 207 while (length > 0) { |
214 | 208 |
215 var size = Math.Min(length, MAX_CHUNK_SIZE); | 209 var size = Math.Min(length, MAX_CHUNK_SIZE); |
216 | 210 |
217 var chunk = new Chunk( | 211 var chunk = new Chunk( |
218 Math.Max(size, m_chunkSize), | 212 Math.Max(size, DEFAULT_CHUNK_SIZE), |
219 data, | 213 data, |
220 offset, | 214 offset, |
221 size, | 215 size, |
222 length // length >= size | 216 length // length >= size |
223 ); | 217 ); |
402 | 396 |
403 } | 397 } |
404 | 398 |
405 public void Clear() { | 399 public void Clear() { |
406 // start the new queue | 400 // start the new queue |
407 var chunk = new Chunk(m_chunkSize); | 401 var chunk = new Chunk(DEFAULT_CHUNK_SIZE); |
408 | 402 |
409 do { | 403 do { |
410 Thread.MemoryBarrier(); | 404 Thread.MemoryBarrier(); |
411 var first = m_first; | 405 var first = m_first; |
412 var last = m_last; | 406 var last = m_last; |
429 } while(true); | 423 } while(true); |
430 } | 424 } |
431 | 425 |
432 public T[] Drain() { | 426 public T[] Drain() { |
433 // start the new queue | 427 // start the new queue |
434 var chunk = new Chunk(m_chunkSize); | 428 var chunk = new Chunk(DEFAULT_CHUNK_SIZE); |
435 | 429 |
436 do { | 430 do { |
437 Thread.MemoryBarrier(); | 431 Thread.MemoryBarrier(); |
438 var first = m_first; | 432 var first = m_first; |
439 var last = m_last; | 433 var last = m_last; |
456 } while(true); | 450 } while(true); |
457 } | 451 } |
458 | 452 |
459 T[] ReadChunks(Chunk chunk, object last) { | 453 T[] ReadChunks(Chunk chunk, object last) { |
460 var result = new List<T>(); | 454 var result = new List<T>(); |
461 var buffer = new T[m_chunkSize]; | 455 var buffer = new T[DEFAULT_CHUNK_SIZE]; |
462 int actual; | 456 int actual; |
463 bool recycle; | 457 bool recycle; |
464 while (chunk != null) { | 458 while (chunk != null) { |
465 // ensure all write operations on the chunk are complete | 459 // ensure all write operations on the chunk are complete |
466 chunk.Commit(); | 460 chunk.Commit(); |