Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 122:0c8685c8b56b v2
minor fixes and improvements of AsyncQueue, additional tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 22:20:45 +0300 |
parents | 62d2f1e98c4e |
children | f4d6ea6969cc |
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs Mon Jan 12 18:19:41 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Mon Jan 12 22:20:45 2015 +0300 @@ -78,42 +78,31 @@ } public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { - int alloc; - int allocSize; - - // in case the batch size is larger than a free space in chunk - // tailGap is used to over allocate the space in the chunk to - // get exclusive permission on creation of the next one. - int tailGap = 0; - - do { - alloc = m_alloc; + //int alloc; + //int allocSize; - if (alloc > m_size) { - // the chunk is full and someone already - // creating the new one - enqueued = 0; // nothing was added - extend = false; // the caller shouldn't try to extend the queue - return false; // nothing was added - } + var alloc = Interlocked.Add(ref m_alloc, length) - length; + if (alloc > m_size) { + // the chunk is full and someone already + // creating the new one + enqueued = 0; // nothing was added + extend = false; // the caller shouldn't try to extend the queue + return false; // nothing was added + } - allocSize = Math.Min(m_size - alloc, length); - if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch - tailGap = 1; // overallocate space to get exclusive permission to extend queue - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); - - extend = tailGap != 0; - enqueued = allocSize; + enqueued = Math.Min(m_size - alloc, length); + extend = length > enqueued; - // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 - if (alloc == m_size) + if (enqueued == 0) return false; - Array.Copy(batch, offset, m_data, alloc, allocSize); - while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { + Array.Copy(batch, offset, m_data, alloc, enqueued); + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { // spin wait for commit } + return true; } @@ -169,7 +158,7 @@ if (extend || last == null) { var chunk = new Chunk(m_chunkSize, value); if (EnqueueChunk(last, chunk)) - break; + break; // success! exit! last = m_last; } else { while (last == m_last) { @@ -326,7 +315,7 @@ /// Tries to dequeue all remaining data in the first chunk. /// </summary> /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> - /// <param name="buffer">The buffer to which data will be written.</param> + /// <param name="buffer">The buffer to which the data will be written.</param> /// <param name="offset">The offset in the buffer at which the data will be written.</param> /// <param name="length">Tha maximum amount of the data to be dequeued.</param> /// <param name="dequeued">The actual amount of the dequeued data.</param>