Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 233:d6fe09f5592c v2
Improved AsyncQueue
Removed ImplabFx
author | cin |
---|---|
date | Wed, 04 Oct 2017 15:44:47 +0300 |
parents | 8d5de4eb9c2c |
children | 8dd666e6b6bf |
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs Tue Sep 12 19:07:42 2017 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Wed Oct 04 15:44:47 2017 +0300 @@ -7,11 +7,11 @@ namespace Implab.Parallels { public class AsyncQueue<T> : IEnumerable<T> { class Chunk { - public Chunk next; + public volatile Chunk next; - int m_low; - int m_hi; - int m_alloc; + volatile int m_low; + volatile int m_hi; + volatile int m_alloc; readonly int m_size; readonly T[] m_data; @@ -28,12 +28,15 @@ m_data[0] = value; } - public Chunk(int size, T[] data, int offset, int length, int alloc) { + public Chunk(int size, int allocated) { m_size = size; - m_hi = length; - m_alloc = alloc; + m_hi = allocated; + m_alloc = allocated; m_data = new T[size]; - Array.Copy(data, offset, m_data, 0, length); + } + + public void WriteData(T[] data, int offset, int dest, int length) { + Array.Copy(data, offset, m_data, dest, length); } public int Low { @@ -48,31 +51,36 @@ get { return m_size; } } - public bool TryEnqueue(T value, out bool extend) { - var alloc = Interlocked.Increment(ref m_alloc) - 1; - - if (alloc >= m_size) { - extend = alloc == m_size; - return false; - } - - extend = false; + public bool TryEnqueue(T value) { + int alloc; + do { + alloc = m_alloc; + if (alloc >= m_size) + return false; + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc)); + m_data[alloc] = value; - while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { + SpinWait spin = new SpinWait(); + // m_hi is volatile + while (alloc != m_hi) { // spin wait for commit + spin.SpinOnce(); } + m_hi = alloc + 1; + return true; } /// <summary> /// Prevents from allocating new space in the chunk and waits for all write operations to complete /// </summary> - public void Commit() { - var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); - - while (m_hi != actual) - Thread.MemoryBarrier(); + public void Seal() { + var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size); + SpinWait spin = new SpinWait(); + while (m_hi != actual) { + spin.SpinOnce(); + } } public bool TryDequeue(out T value, out bool recycle) { @@ -84,44 +92,38 @@ recycle = (low == m_size); return false; } - } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); + } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low)); - recycle = (low == m_size - 1); + recycle = (low + 1 == m_size); value = m_data[low]; return true; } - public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { - //int alloc; - //int allocSize; + public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) { + int alloc; + do { + alloc = m_alloc; + if (alloc >= m_size) { + enqueued = 0; + return false; + } else { + enqueued = Math.Min(length, m_size - alloc); + } + } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc)); + + Array.Copy(batch, offset, m_data, alloc, enqueued); - var alloc = Interlocked.Add(ref m_alloc, length) - length; - if (alloc > m_size) { - // the chunk is full and someone already - // creating the new one - enqueued = 0; // nothing was added - extend = false; // the caller shouldn't try to extend the queue - return false; // nothing was added + SpinWait spin = new SpinWait(); + while (alloc != m_hi) { + spin.SpinOnce(); } - enqueued = Math.Min(m_size - alloc, length); - extend = length > enqueued; - - if (enqueued == 0) - return false; - - - Array.Copy(batch, offset, m_data, alloc, enqueued); - - while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { - // spin wait for commit - } - + m_hi = alloc + enqueued; return true; } - public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { + public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) { int low, hi, batchSize; do { @@ -129,15 +131,14 @@ hi = m_hi; if (low >= hi) { dequeued = 0; - recycle = (low == m_size); // recycling could be restarted and we need to signal again + recycle = (low == m_size); return false; } batchSize = Math.Min(hi - low, length); - } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); + } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); - recycle = (low == m_size - batchSize); dequeued = batchSize; - + recycle = (low + batchSize == m_size); Array.Copy(m_data, low, buffer, offset, batchSize); return true; @@ -149,32 +150,33 @@ } public const int DEFAULT_CHUNK_SIZE = 32; - public const int MAX_CHUNK_SIZE = 262144; + public const int MAX_CHUNK_SIZE = 256; Chunk m_first; Chunk m_last; + public AsyncQueue() { + m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE); + } + /// <summary> /// Adds the specified value to the queue. /// </summary> /// <param name="value">Tha value which will be added to the queue.</param> - public virtual void Enqueue(T value) { + public void Enqueue(T value) { var last = m_last; - // spin wait to the new chunk - bool extend = true; - while (last == null || !last.TryEnqueue(value, out extend)) { + SpinWait spin = new SpinWait(); + while (!last.TryEnqueue(value)) { // try to extend queue - if (extend || last == null) { - var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); - if (EnqueueChunk(last, chunk)) - break; // success! exit! - last = m_last; + var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); + var t = Interlocked.CompareExchange(ref m_last, chunk, last); + if (t == last) { + last.next = chunk; + break; } else { - while (last == m_last) { - Thread.MemoryBarrier(); - } - last = m_last; + last = t; } + spin.SpinOnce(); } } @@ -184,67 +186,54 @@ /// <param name="data">The buffer which contains the data to be enqueued.</param> /// <param name="offset">The offset of the data in the buffer.</param> /// <param name="length">The size of the data to read from the buffer.</param> - public virtual void EnqueueRange(T[] data, int offset, int length) { + public void EnqueueRange(T[] data, int offset, int length) { if (data == null) throw new ArgumentNullException("data"); - if (length == 0) - return; if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > data.Length) throw new ArgumentOutOfRangeException("length"); - var last = m_last; + while (length > 0) { + var last = m_last; + int enqueued; - bool extend; - int enqueued; - - while (length > 0) { - extend = true; - if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { + if (last.TryEnqueueBatch(data, offset, length, out enqueued)) { length -= enqueued; offset += enqueued; } - if (extend) { - // there was no enough space in the chunk - // or there was no chunks in the queue + if (length > 0) { + // we have something to enqueue - while (length > 0) { - - var size = Math.Min(length, MAX_CHUNK_SIZE); + var tail = length % MAX_CHUNK_SIZE; - var chunk = new Chunk( - Math.Max(size, DEFAULT_CHUNK_SIZE), - data, - offset, - size, - length // length >= size - ); + var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail); + + if (last != Interlocked.CompareExchange(ref m_last, chunk, last)) + continue; // we wasn't able to catch the writer, roundtrip - if (!EnqueueChunk(last, chunk)) { - // looks like the queue has been updated then proceed from the beginning - last = m_last; - break; - } + // we are lucky + // we can exclusively write our batch, the other writers will continue their work + + length -= tail; - // we have successfully added the new chunk - last = chunk; - length -= size; - offset += size; - } - } else { - // we don't need to extend the queue, if we successfully enqueued data - if (length == 0) - break; - - // if we need to wait while someone is extending the queue - // spinwait - while (last == m_last) { - Thread.MemoryBarrier(); + + for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) { + var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE); + node.WriteData(data, offset, 0, MAX_CHUNK_SIZE); + offset += MAX_CHUNK_SIZE; + // fence last.next is volatile + last.next = node; + last = node; } - last = m_last; + if (tail > 0) + chunk.WriteData(data, offset, 0, tail); + + // fence last.next is volatile + last.next = chunk; + return; } } } @@ -256,26 +245,21 @@ /// <param name="value">The value of the dequeued element.</param> public bool TryDequeue(out T value) { var chunk = m_first; - bool recycle; - while (chunk != null) { + do { + bool recycle; var result = chunk.TryDequeue(out value, out recycle); - if (recycle) // this chunk is waste - RecycleFirstChunk(chunk); - else + if (recycle && chunk.next != null) { + // this chunk is waste + chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); + } else { return result; // this chunk is usable and returned actual result + } if (result) // this chunk is waste but the true result is always actual return true; - - // try again - chunk = m_first; - } - - // the queue is empty - value = default(T); - return false; + } while (true); } /// <summary> @@ -295,10 +279,9 @@ throw new ArgumentOutOfRangeException("length"); var chunk = m_first; - bool recycle; dequeued = 0; - while (chunk != null) { - + do { + bool recycle; int actual; if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { offset += actual; @@ -306,18 +289,16 @@ dequeued += actual; } - if (recycle) // this chunk is waste - RecycleFirstChunk(chunk); - else if (actual == 0) - break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) + if (recycle && chunk.next != null) { + // this chunk is waste + chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); + } else { + chunk = null; + } if (length == 0) return true; - - // we still may dequeue something - // try again - chunk = m_first; - } + } while (chunk != null); return dequeued != 0; } @@ -339,123 +320,81 @@ throw new ArgumentOutOfRangeException("length"); var chunk = m_first; - bool recycle; - dequeued = 0; - - while (chunk != null) { + do { + bool recycle; + chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle); - int actual; - if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { - dequeued = actual; + if (recycle && chunk.next != null) { + // this chunk is waste + chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); + } else { + chunk = null; } - if (recycle) // this chunk is waste - RecycleFirstChunk(chunk); - // if we have dequeued any data, then return if (dequeued != 0) return true; - // we still may dequeue something - // try again - chunk = m_first; - } + } while (chunk != null); return false; } - - bool EnqueueChunk(Chunk last, Chunk chunk) { - if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) - return false; - - if (last != null) - last.next = chunk; - else { - m_first = chunk; - } - return true; - } - - void RecycleFirstChunk(Chunk first) { - var next = first.next; - - if (first != Interlocked.CompareExchange(ref m_first, next, first)) - return; - - if (next == null) { - - if (first != Interlocked.CompareExchange(ref m_last, null, first)) { - - // race - // someone already updated the tail, restore the pointer to the queue head - m_first = first; - } - // the tail is updated - } - } + public void Clear() { // start the new queue var chunk = new Chunk(DEFAULT_CHUNK_SIZE); - do { - Thread.MemoryBarrier(); var first = m_first; - var last = m_last; - - if (last == null) // nothing to clear - return; - - if (first == null || (first.next == null && first != last)) // inconcistency + if (first.next == null && first != m_last) { continue; - - // here we will create inconsistency which will force others to spin - // and prevent from fetching. chunk.next = null - if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) - continue;// inconsistent - - m_last = chunk; - - return; - - } while(true); - } - - public T[] Drain() { - // start the new queue - var chunk = new Chunk(DEFAULT_CHUNK_SIZE); - - do { - Thread.MemoryBarrier(); - var first = m_first; - var last = m_last; - - if (last == null) - return new T[0]; - - if (first == null || (first.next == null && first != last)) - continue; + } // here we will create inconsistency which will force others to spin // and prevent from fetching. chunk.next = null if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) continue;// inconsistent - last = Interlocked.Exchange(ref m_last, chunk); + m_last = chunk; + return; + } while (true); + } + + public List<T> Drain() { + // start the new queue + var chunk = new Chunk(DEFAULT_CHUNK_SIZE); + + do { + var first = m_first; + // first.next is volatile + if (first.next == null) { + if (first != m_last) + continue; + else if (first.Hi == first.Low) + return new List<T>(); + } + + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + + var last = Interlocked.Exchange(ref m_last, chunk); return ReadChunks(first, last); - } while(true); + } while (true); } - - static T[] ReadChunks(Chunk chunk, object last) { + + static List<T> ReadChunks(Chunk chunk, object last) { var result = new List<T>(); - var buffer = new T[DEFAULT_CHUNK_SIZE]; + var buffer = new T[MAX_CHUNK_SIZE]; int actual; bool recycle; + SpinWait spin = new SpinWait(); while (chunk != null) { // ensure all write operations on the chunk are complete - chunk.Commit(); + chunk.Seal(); // we need to read the chunk using this way // since some client still may completing the dequeue @@ -467,12 +406,12 @@ chunk = null; } else { while (chunk.next == null) - Thread.MemoryBarrier(); + spin.SpinOnce(); chunk = chunk.next; } } - return result.ToArray(); + return result; } struct ArraySegmentCollection : ICollection<T> { @@ -501,7 +440,7 @@ } public void CopyTo(T[] array, int arrayIndex) { - Array.Copy(m_data,m_offset,array,arrayIndex, m_length); + Array.Copy(m_data, m_offset, array, arrayIndex, m_length); } public bool Remove(T item) {