Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 122:0c8685c8b56b v2
minor fixes and improvements of AsyncQueue, additional tests
| author | cin |
|---|---|
| date | Mon, 12 Jan 2015 22:20:45 +0300 |
| parents | 62d2f1e98c4e |
| children | f4d6ea6969cc |
comparison
equal
deleted
inserted
replaced
| 121:62d2f1e98c4e | 122:0c8685c8b56b |
|---|---|
| 76 | 76 |
| 77 return true; | 77 return true; |
| 78 } | 78 } |
| 79 | 79 |
| 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { |
| 81 int alloc; | 81 //int alloc; |
| 82 int allocSize; | 82 //int allocSize; |
| 83 | 83 |
| 84 // in case the batch size is larger than a free space in chunk | 84 var alloc = Interlocked.Add(ref m_alloc, length) - length; |
| 85 // tailGap is used to over allocate the space in the chunk to | 85 if (alloc > m_size) { |
| 86 // get exclusive permission on creation of the next one. | 86 // the chunk is full and someone already |
| 87 int tailGap = 0; | 87 // creating the new one |
| 88 | 88 enqueued = 0; // nothing was added |
| 89 do { | 89 extend = false; // the caller shouldn't try to extend the queue |
| 90 alloc = m_alloc; | 90 return false; // nothing was added |
| 91 | 91 } |
| 92 if (alloc > m_size) { | 92 |
| 93 // the chunk is full and someone already | 93 enqueued = Math.Min(m_size - alloc, length); |
| 94 // creating the new one | 94 extend = length > enqueued; |
| 95 enqueued = 0; // nothing was added | 95 |
| 96 extend = false; // the caller shouldn't try to extend the queue | 96 if (enqueued == 0) |
| 97 return false; // nothing was added | |
| 98 } | |
| 99 | |
| 100 allocSize = Math.Min(m_size - alloc, length); | |
| 101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch | |
| 102 tailGap = 1; // overallocate space to get exclusive permission to extend queue | |
| 103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); | |
| 104 | |
| 105 extend = tailGap != 0; | |
| 106 enqueued = allocSize; | |
| 107 | |
| 108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 | |
| 109 if (alloc == m_size) | |
| 110 return false; | 97 return false; |
| 111 | 98 |
| 112 Array.Copy(batch, offset, m_data, alloc, allocSize); | 99 |
| 113 | 100 Array.Copy(batch, offset, m_data, alloc, enqueued); |
| 114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | 101 |
| 102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { | |
| 115 // spin wait for commit | 103 // spin wait for commit |
| 116 } | 104 } |
| 105 | |
| 117 return true; | 106 return true; |
| 118 } | 107 } |
| 119 | 108 |
| 120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { | 109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { |
| 121 int low, hi, batchSize; | 110 int low, hi, batchSize; |
| 167 while (last == null || !last.TryEnqueue(value, out extend)) { | 156 while (last == null || !last.TryEnqueue(value, out extend)) { |
| 168 // try to extend queue | 157 // try to extend queue |
| 169 if (extend || last == null) { | 158 if (extend || last == null) { |
| 170 var chunk = new Chunk(m_chunkSize, value); | 159 var chunk = new Chunk(m_chunkSize, value); |
| 171 if (EnqueueChunk(last, chunk)) | 160 if (EnqueueChunk(last, chunk)) |
| 172 break; | 161 break; // success! exit! |
| 173 last = m_last; | 162 last = m_last; |
| 174 } else { | 163 } else { |
| 175 while (last == m_last) { | 164 while (last == m_last) { |
| 176 Thread.MemoryBarrier(); | 165 Thread.MemoryBarrier(); |
| 177 } | 166 } |
| 324 | 313 |
| 325 /// <summary> | 314 /// <summary> |
| 326 /// Tries to dequeue all remaining data in the first chunk. | 315 /// Tries to dequeue all remaining data in the first chunk. |
| 327 /// </summary> | 316 /// </summary> |
| 328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | 317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> |
| 329 /// <param name="buffer">The buffer to which data will be written.</param> | 318 /// <param name="buffer">The buffer to which the data will be written.</param> |
| 330 /// <param name="offset">The offset in the buffer at which the data will be written.</param> | 319 /// <param name="offset">The offset in the buffer at which the data will be written.</param> |
| 331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | 320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param> |
| 332 /// <param name="dequeued">The actual amount of the dequeued data.</param> | 321 /// <param name="dequeued">The actual amount of the dequeued data.</param> |
| 333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { | 322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { |
| 334 if (buffer == null) | 323 if (buffer == null) |
