Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 122:0c8685c8b56b v2
minor fixes and improvements of AsyncQueue, additional tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 22:20:45 +0300 |
parents | 62d2f1e98c4e |
children | f4d6ea6969cc |
comparison
equal
deleted
inserted
replaced
121:62d2f1e98c4e | 122:0c8685c8b56b |
---|---|
76 | 76 |
77 return true; | 77 return true; |
78 } | 78 } |
79 | 79 |
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { |
81 int alloc; | 81 //int alloc; |
82 int allocSize; | 82 //int allocSize; |
83 | 83 |
84 // in case the batch size is larger than a free space in chunk | 84 var alloc = Interlocked.Add(ref m_alloc, length) - length; |
85 // tailGap is used to over allocate the space in the chunk to | 85 if (alloc > m_size) { |
86 // get exclusive permission on creation of the next one. | 86 // the chunk is full and someone already |
87 int tailGap = 0; | 87 // creating the new one |
88 | 88 enqueued = 0; // nothing was added |
89 do { | 89 extend = false; // the caller shouldn't try to extend the queue |
90 alloc = m_alloc; | 90 return false; // nothing was added |
91 | 91 } |
92 if (alloc > m_size) { | 92 |
93 // the chunk is full and someone already | 93 enqueued = Math.Min(m_size - alloc, length); |
94 // creating the new one | 94 extend = length > enqueued; |
95 enqueued = 0; // nothing was added | 95 |
96 extend = false; // the caller shouldn't try to extend the queue | 96 if (enqueued == 0) |
97 return false; // nothing was added | |
98 } | |
99 | |
100 allocSize = Math.Min(m_size - alloc, length); | |
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch | |
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue | |
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); | |
104 | |
105 extend = tailGap != 0; | |
106 enqueued = allocSize; | |
107 | |
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 | |
109 if (alloc == m_size) | |
110 return false; | 97 return false; |
111 | 98 |
112 Array.Copy(batch, offset, m_data, alloc, allocSize); | 99 |
113 | 100 Array.Copy(batch, offset, m_data, alloc, enqueued); |
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | 101 |
102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { | |
115 // spin wait for commit | 103 // spin wait for commit |
116 } | 104 } |
105 | |
117 return true; | 106 return true; |
118 } | 107 } |
119 | 108 |
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { | 109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { |
121 int low, hi, batchSize; | 110 int low, hi, batchSize; |
167 while (last == null || !last.TryEnqueue(value, out extend)) { | 156 while (last == null || !last.TryEnqueue(value, out extend)) { |
168 // try to extend queue | 157 // try to extend queue |
169 if (extend || last == null) { | 158 if (extend || last == null) { |
170 var chunk = new Chunk(m_chunkSize, value); | 159 var chunk = new Chunk(m_chunkSize, value); |
171 if (EnqueueChunk(last, chunk)) | 160 if (EnqueueChunk(last, chunk)) |
172 break; | 161 break; // success! exit! |
173 last = m_last; | 162 last = m_last; |
174 } else { | 163 } else { |
175 while (last == m_last) { | 164 while (last == m_last) { |
176 Thread.MemoryBarrier(); | 165 Thread.MemoryBarrier(); |
177 } | 166 } |
324 | 313 |
325 /// <summary> | 314 /// <summary> |
326 /// Tries to dequeue all remaining data in the first chunk. | 315 /// Tries to dequeue all remaining data in the first chunk. |
327 /// </summary> | 316 /// </summary> |
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> | 317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> |
329 /// <param name="buffer">The buffer to which data will be written.</param> | 318 /// <param name="buffer">The buffer to which the data will be written.</param> |
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param> | 319 /// <param name="offset">The offset in the buffer at which the data will be written.</param> |
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param> | 320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param> |
332 /// <param name="dequeued">The actual amount of the dequeued data.</param> | 321 /// <param name="dequeued">The actual amount of the dequeued data.</param> |
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { | 322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { |
334 if (buffer == null) | 323 if (buffer == null) |