comparison Implab/Parallels/AsyncQueue.cs @ 125:f803565868a4 v2

improved performance of promises
author cin
date Thu, 15 Jan 2015 12:09:20 +0300
parents a336cb13c6a9
children d86da8d2d4c3
comparison
equal deleted inserted replaced
124:a336cb13c6a9 125:f803565868a4
145 } 145 }
146 146
147 public const int DEFAULT_CHUNK_SIZE = 32; 147 public const int DEFAULT_CHUNK_SIZE = 32;
148 public const int MAX_CHUNK_SIZE = 262144; 148 public const int MAX_CHUNK_SIZE = 262144;
149 149
150 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
151
152 Chunk m_first; 150 Chunk m_first;
153 Chunk m_last; 151 Chunk m_last;
154
155 public AsyncQueue() {
156 m_last = m_first = new Chunk(m_chunkSize);
157 }
158 152
159 /// <summary> 153 /// <summary>
160 /// Adds the specified value to the queue. 154 /// Adds the specified value to the queue.
161 /// </summary> 155 /// </summary>
162 /// <param name="value">Tha value which will be added to the queue.</param> 156 /// <param name="value">Tha value which will be added to the queue.</param>
165 // spin wait to the new chunk 159 // spin wait to the new chunk
166 bool extend = true; 160 bool extend = true;
167 while (last == null || !last.TryEnqueue(value, out extend)) { 161 while (last == null || !last.TryEnqueue(value, out extend)) {
168 // try to extend queue 162 // try to extend queue
169 if (extend || last == null) { 163 if (extend || last == null) {
170 var chunk = new Chunk(m_chunkSize, value); 164 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
171 if (EnqueueChunk(last, chunk)) 165 if (EnqueueChunk(last, chunk))
172 break; // success! exit! 166 break; // success! exit!
173 last = m_last; 167 last = m_last;
174 } else { 168 } else {
175 while (last == m_last) { 169 while (last == m_last) {
213 while (length > 0) { 207 while (length > 0) {
214 208
215 var size = Math.Min(length, MAX_CHUNK_SIZE); 209 var size = Math.Min(length, MAX_CHUNK_SIZE);
216 210
217 var chunk = new Chunk( 211 var chunk = new Chunk(
218 Math.Max(size, m_chunkSize), 212 Math.Max(size, DEFAULT_CHUNK_SIZE),
219 data, 213 data,
220 offset, 214 offset,
221 size, 215 size,
222 length // length >= size 216 length // length >= size
223 ); 217 );
402 396
403 } 397 }
404 398
405 public void Clear() { 399 public void Clear() {
406 // start the new queue 400 // start the new queue
407 var chunk = new Chunk(m_chunkSize); 401 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
408 402
409 do { 403 do {
410 Thread.MemoryBarrier(); 404 Thread.MemoryBarrier();
411 var first = m_first; 405 var first = m_first;
412 var last = m_last; 406 var last = m_last;
429 } while(true); 423 } while(true);
430 } 424 }
431 425
432 public T[] Drain() { 426 public T[] Drain() {
433 // start the new queue 427 // start the new queue
434 var chunk = new Chunk(m_chunkSize); 428 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
435 429
436 do { 430 do {
437 Thread.MemoryBarrier(); 431 Thread.MemoryBarrier();
438 var first = m_first; 432 var first = m_first;
439 var last = m_last; 433 var last = m_last;
456 } while(true); 450 } while(true);
457 } 451 }
458 452
459 T[] ReadChunks(Chunk chunk, object last) { 453 T[] ReadChunks(Chunk chunk, object last) {
460 var result = new List<T>(); 454 var result = new List<T>();
461 var buffer = new T[m_chunkSize]; 455 var buffer = new T[DEFAULT_CHUNK_SIZE];
462 int actual; 456 int actual;
463 bool recycle; 457 bool recycle;
464 while (chunk != null) { 458 while (chunk != null) {
465 // ensure all write operations on the chunk are complete 459 // ensure all write operations on the chunk are complete
466 chunk.Commit(); 460 chunk.Commit();