Mercurial > pub > ImplabNet
view Implab/Parallels/AsyncQueue.cs @ 239:eedf4d834e67 v2
fix
author | cin |
---|---|
date | Wed, 13 Dec 2017 19:54:45 +0300 |
parents | 8dd666e6b6bf |
children |
line wrap: on
line source
using System.Threading; using System.Collections.Generic; using System; using System.Collections; using System.Diagnostics; using System.Runtime.CompilerServices; namespace Implab.Parallels { public class AsyncQueue<T> : IEnumerable<T> { class Chunk { public volatile Chunk next; volatile int m_low; volatile int m_hi; volatile int m_alloc; readonly int m_size; readonly T[] m_data; public Chunk(int size) { m_size = size; m_data = new T[size]; } public Chunk(int size, T value) { m_size = size; m_hi = 1; m_alloc = 1; m_data = new T[size]; m_data[0] = value; } public Chunk(int size, int allocated) { m_size = size; m_hi = allocated; m_alloc = allocated; m_data = new T[size]; } public void WriteData(T[] data, int offset, int dest, int length) { Array.Copy(data, offset, m_data, dest, length); } public int Low { get { return m_low; } } public int Hi { get { return m_hi; } } public int Size { get { return m_size; } } [MethodImpl(MethodImplOptions.AggressiveInlining)] void AwaitWrites(int mark) { if (m_hi != mark) { SpinWait spin = new SpinWait(); do { spin.SpinOnce(); } while (m_hi != mark); } } public bool TryEnqueue(T value) { int alloc; do { alloc = m_alloc; if (alloc >= m_size) return false; } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc)); m_data[alloc] = value; AwaitWrites(alloc); m_hi = alloc + 1; return true; } /// <summary> /// Prevents from allocating new space in the chunk and waits for all write operations to complete /// </summary> public void Seal() { var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size); AwaitWrites(actual); } public bool TryDequeue(out T value, out bool recycle) { int low; do { low = m_low; if (low >= m_hi) { value = default(T); recycle = (low == m_size); return false; } } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low)); recycle = (low + 1 == m_size); value = m_data[low]; return true; } public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) { int alloc; do { alloc = m_alloc; if (alloc >= m_size) { enqueued = 0; return false; } else { enqueued = Math.Min(length, m_size - alloc); } } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc)); Array.Copy(batch, offset, m_data, alloc, enqueued); AwaitWrites(alloc); m_hi = alloc + enqueued; return true; } public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) { int low, hi, batchSize; do { low = m_low; hi = m_hi; if (low >= hi) { dequeued = 0; recycle = (low == m_size); return false; } batchSize = Math.Min(hi - low, length); } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); dequeued = batchSize; recycle = (low + batchSize == m_size); Array.Copy(m_data, low, buffer, offset, batchSize); return true; } public T GetAt(int pos) { return m_data[pos]; } } public const int DEFAULT_CHUNK_SIZE = 32; public const int MAX_CHUNK_SIZE = 256; Chunk m_first; Chunk m_last; public AsyncQueue() { m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE); } /// <summary> /// Adds the specified value to the queue. /// </summary> /// <param name="value">Tha value which will be added to the queue.</param> public void Enqueue(T value) { var last = m_last; SpinWait spin = new SpinWait(); while (!last.TryEnqueue(value)) { // try to extend queue var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); var t = Interlocked.CompareExchange(ref m_last, chunk, last); if (t == last) { last.next = chunk; break; } else { last = t; } spin.SpinOnce(); } } /// <summary> /// Adds the specified data to the queue. /// </summary> /// <param name="data">The buffer which contains the data to be enqueued.</param> /// <param name="offset">The offset of the data in the buffer.</param> /// <param name="length">The size of the data to read from the buffer.</param> public void EnqueueRange(T[] data, int offset, int length) { if (data == null) throw new ArgumentNullException("data"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > data.Length) throw new ArgumentOutOfRangeException("length"); while (length > 0) { var last = m_last; int enqueued; if (last.TryEnqueueBatch(data, offset, length, out enqueued)) { length -= enqueued; offset += enqueued; } if (length > 0) { // we have something to enqueue var tail = length % MAX_CHUNK_SIZE; var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail); if (last != Interlocked.CompareExchange(ref m_last, chunk, last)) continue; // we wasn't able to catch the writer, roundtrip // we are lucky // we can exclusively write our batch, the other writers will continue their work length -= tail; for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) { var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE); node.WriteData(data, offset, 0, MAX_CHUNK_SIZE); offset += MAX_CHUNK_SIZE; // fence last.next is volatile last.next = node; last = node; } if (tail > 0) chunk.WriteData(data, offset, 0, tail); // fence last.next is volatile last.next = chunk; return; } } } /// <summary> /// Tries to retrieve the first element from the queue. /// </summary> /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> /// <param name="value">The value of the dequeued element.</param> public bool TryDequeue(out T value) { var chunk = m_first; do { bool recycle; var result = chunk.TryDequeue(out value, out recycle); if (recycle && chunk.next != null) { // this chunk is waste chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); } else { return result; // this chunk is usable and returned actual result } if (result) // this chunk is waste but the true result is always actual return true; } while (true); } /// <summary> /// Tries to dequeue the specified amount of data from the queue. /// </summary> /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns> /// <param name="buffer">The buffer to which the data will be written.</param> /// <param name="offset">The offset in the buffer at which the data will be written.</param> /// <param name="length">The maximum amount of data to be retrieved.</param> /// <param name="dequeued">The actual amout of the retrieved data.</param> public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > buffer.Length) throw new ArgumentOutOfRangeException("length"); var chunk = m_first; dequeued = 0; do { bool recycle; int actual; if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { offset += actual; length -= actual; dequeued += actual; } if (recycle && chunk.next != null) { // this chunk is waste chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); } else { chunk = null; } if (length == 0) return true; } while (chunk != null); return dequeued != 0; } /// <summary> /// Tries to dequeue all remaining data in the first chunk. /// </summary> /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> /// <param name="buffer">The buffer to which the data will be written.</param> /// <param name="offset">The offset in the buffer at which the data will be written.</param> /// <param name="length">Tha maximum amount of the data to be dequeued.</param> /// <param name="dequeued">The actual amount of the dequeued data.</param> public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { if (buffer == null) throw new ArgumentNullException("buffer"); if (offset < 0) throw new ArgumentOutOfRangeException("offset"); if (length < 1 || offset + length > buffer.Length) throw new ArgumentOutOfRangeException("length"); var chunk = m_first; do { bool recycle; chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle); if (recycle && chunk.next != null) { // this chunk is waste chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk); } else { chunk = null; } // if we have dequeued any data, then return if (dequeued != 0) return true; } while (chunk != null); return false; } public void Clear() { // start the new queue var chunk = new Chunk(DEFAULT_CHUNK_SIZE); do { var first = m_first; if (first.next == null && first != m_last) { continue; } // here we will create inconsistency which will force others to spin // and prevent from fetching. chunk.next = null if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) continue;// inconsistent m_last = chunk; return; } while (true); } public List<T> Drain() { Chunk chunk = null; do { var first = m_first; // first.next is volatile if (first.next == null) { if (first != m_last) continue; else if (first.Hi == first.Low) return new List<T>(); } // start the new queue if (chunk == null) chunk = new Chunk(DEFAULT_CHUNK_SIZE); // here we will create inconsistency which will force others to spin // and prevent from fetching. chunk.next = null if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) continue;// inconsistent var last = Interlocked.Exchange(ref m_last, chunk); return ReadChunks(first, last); } while (true); } static List<T> ReadChunks(Chunk chunk, object last) { var result = new List<T>(); var buffer = new T[MAX_CHUNK_SIZE]; int actual; bool recycle; SpinWait spin = new SpinWait(); while (chunk != null) { // ensure all write operations on the chunk are complete chunk.Seal(); // we need to read the chunk using this way // since some client still may completing the dequeue // operation, such clients most likely won't get results while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); if (chunk == last) { chunk = null; } else { while (chunk.next == null) spin.SpinOnce(); chunk = chunk.next; } } return result; } struct ArraySegmentCollection : ICollection<T> { readonly T[] m_data; readonly int m_offset; readonly int m_length; public ArraySegmentCollection(T[] data, int offset, int length) { m_data = data; m_offset = offset; m_length = length; } #region ICollection implementation public void Add(T item) { throw new NotSupportedException(); } public void Clear() { throw new NotSupportedException(); } public bool Contains(T item) { return false; } public void CopyTo(T[] array, int arrayIndex) { Array.Copy(m_data, m_offset, array, arrayIndex, m_length); } public bool Remove(T item) { throw new NotSupportedException(); } public int Count { get { return m_length; } } public bool IsReadOnly { get { return true; } } #endregion #region IEnumerable implementation public IEnumerator<T> GetEnumerator() { for (int i = m_offset; i < m_length + m_offset; i++) yield return m_data[i]; } #endregion #region IEnumerable implementation IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } #endregion } #region IEnumerable implementation class Enumerator : IEnumerator<T> { Chunk m_current; int m_pos = -1; public Enumerator(Chunk fisrt) { m_current = fisrt; } #region IEnumerator implementation public bool MoveNext() { if (m_current == null) return false; if (m_pos == -1) m_pos = m_current.Low; else m_pos++; if (m_pos == m_current.Hi) { m_current = m_pos == m_current.Size ? m_current.next : null; m_pos = 0; if (m_current == null) return false; } return true; } public void Reset() { throw new NotSupportedException(); } object IEnumerator.Current { get { return Current; } } #endregion #region IDisposable implementation public void Dispose() { } #endregion #region IEnumerator implementation public T Current { get { if (m_pos == -1 || m_current == null) throw new InvalidOperationException(); return m_current.GetAt(m_pos); } } #endregion } public IEnumerator<T> GetEnumerator() { return new Enumerator(m_first); } #endregion #region IEnumerable implementation IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } #endregion } }