Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 121:62d2f1e98c4e v2
working version of AsyncQueue and batch operations
tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 18:19:41 +0300 |
parents | f1b897999260 |
children | 0c8685c8b56b |
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs Mon Jan 12 05:19:52 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Mon Jan 12 18:19:41 2015 +0300 @@ -27,6 +27,14 @@ m_data[0] = value; } + public Chunk(int size, T[] data, int offset, int length, int alloc) { + m_size = size; + m_hi = length; + m_alloc = alloc; + m_data = new T[size]; + Array.Copy(data, offset, m_data, 0, length); + } + public int Low { get { return m_low; } } @@ -35,7 +43,7 @@ get { return m_hi; } } - public bool TryEnqueue(T value,out bool extend) { + public bool TryEnqueue(T value, out bool extend) { var alloc = Interlocked.Increment(ref m_alloc) - 1; if (alloc >= m_size) { @@ -52,7 +60,7 @@ return true; } - public bool TryDequeue(out T value,out bool recycle) { + public bool TryDequeue(out T value, out bool recycle) { int low; do { low = m_low; @@ -73,27 +81,35 @@ int alloc; int allocSize; + // in case the batch size is larger than a free space in chunk + // tailGap is used to over allocate the space in the chunk to + // get exclusive permission on creation of the next one. + int tailGap = 0; + do { alloc = m_alloc; if (alloc > m_size) { - enqueued = 0; - extend = false; - return false; + // the chunk is full and someone already + // creating the new one + enqueued = 0; // nothing was added + extend = false; // the caller shouldn't try to extend the queue + return false; // nothing was added } - allocSize = Math.Min(m_size - m_alloc, length); - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); + allocSize = Math.Min(m_size - alloc, length); + if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch + tailGap = 1; // overallocate space to get exclusive permission to extend queue + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); + + extend = tailGap != 0; + enqueued = allocSize; - if (alloc == m_size) { - enqueued = 0; - extend = true; + // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 + if (alloc == m_size) return false; - } Array.Copy(batch, offset, m_data, alloc, allocSize); - enqueued = allocSize; - extend = false; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { // spin wait for commit @@ -101,12 +117,35 @@ return true; } + public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { + int low, hi, batchSize; + + do { + low = m_low; + hi = m_hi; + if (low >= hi) { + dequeued = 0; + recycle = (low == m_size); // recycling could be restarted and we need to signal again + return false; + } + batchSize = Math.Min(hi - low, length); + } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); + + recycle = (low == m_size - batchSize); + dequeued = batchSize; + + Array.Copy(m_data, low, buffer, offset, batchSize); + + return true; + } + public T GetAt(int pos) { return m_data[pos]; } } public const int DEFAULT_CHUNK_SIZE = 32; + public const int MAX_CHUNK_SIZE = 262144; readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; @@ -117,11 +156,15 @@ m_last = m_first = new Chunk(m_chunkSize); } + /// <summary> + /// Adds the specified value to the queue. + /// </summary> + /// <param name="value">Tha value which will be added to the queue.</param> public void Enqueue(T value) { var last = m_last; // spin wait to the new chunk bool extend = true; - while(last == null || !last.TryEnqueue(value, out extend)) { + while (last == null || !last.TryEnqueue(value, out extend)) { // try to extend queue if (extend || last == null) { var chunk = new Chunk(m_chunkSize, value); @@ -129,14 +172,88 @@ break; last = m_last; } else { - while (last != m_last) { + while (last == m_last) { Thread.MemoryBarrier(); - last = m_last; } + last = m_last; } } } + /// <summary> + /// Adds the specified data to the queue. + /// </summary> + /// <param name="data">The buffer which contains the data to be enqueued.</param> + /// <param name="offset">The offset of the data in the buffer.</param> + /// <param name="length">The size of the data to read from the buffer.</param> + public void EnqueueRange(T[] data, int offset, int length) { + if (data == null) + throw new ArgumentNullException("data"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > data.Length) + throw new ArgumentOutOfRangeException("length"); + + var last = m_last; + + bool extend; + int enqueued; + + while (length > 0) { + extend = true; + if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { + length -= enqueued; + offset += enqueued; + } + + if (extend) { + // there was no enough space in the chunk + // or there was no chunks in the queue + + while (length > 0) { + + var size = Math.Min(length, MAX_CHUNK_SIZE); + + var chunk = new Chunk( + Math.Max(size, m_chunkSize), + data, + offset, + size, + length // length >= size + ); + + if (!EnqueueChunk(last, chunk)) { + // looks like the queue has been updated then proceed from the beginning + last = m_last; + break; + } + + // we have successfully added the new chunk + last = chunk; + length -= size; + offset += size; + } + } else { + // we don't need to extend the queue, if we successfully enqueued data + if (length == 0) + break; + + // if we need to wait while someone is extending the queue + // spinwait + while (last == m_last) { + Thread.MemoryBarrier(); + } + + last = m_last; + } + } + } + + /// <summary> + /// Tries to retrieve the first element from the queue. + /// </summary> + /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> + /// <param name="value">The value of the dequeued element.</param> public bool TryDequeue(out T value) { var chunk = m_first; bool recycle; @@ -161,6 +278,92 @@ return false; } + /// <summary> + /// Tries to dequeue the specified amount of data from the queue. + /// </summary> + /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns> + /// <param name="buffer">The buffer to which the data will be written.</param> + /// <param name="offset">The offset in the buffer at which the data will be written.</param> + /// <param name="length">The maximum amount of data to be retrieved.</param> + /// <param name="dequeued">The actual amout of the retrieved data.</param> + public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > buffer.Length) + throw new ArgumentOutOfRangeException("length"); + + var chunk = m_first; + bool recycle; + dequeued = 0; + while (chunk != null) { + + int actual; + if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { + offset += actual; + length -= actual; + dequeued += actual; + } + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + else if (actual == 0) + break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) + + if (length == 0) + return true; + + // we still may dequeue something + // try again + chunk = m_first; + } + + return dequeued != 0; + } + + /// <summary> + /// Tries to dequeue all remaining data in the first chunk. + /// </summary> + /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> + /// <param name="buffer">The buffer to which data will be written.</param> + /// <param name="offset">The offset in the buffer at which the data will be written.</param> + /// <param name="length">Tha maximum amount of the data to be dequeued.</param> + /// <param name="dequeued">The actual amount of the dequeued data.</param> + public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > buffer.Length) + throw new ArgumentOutOfRangeException("length"); + + var chunk = m_first; + bool recycle; + dequeued = 0; + + while (chunk != null) { + + int actual; + if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { + dequeued = actual; + } + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + + // if we have dequeued any data, then return + if (dequeued != 0) + return true; + + // we still may dequeue something + // try again + chunk = m_first; + } + + return false; + } + bool EnqueueChunk(Chunk last, Chunk chunk) { if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) return false;