Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 192:f1da3afc3521 release v2.1
Слияние с v2
author | cin |
---|---|
date | Fri, 22 Apr 2016 13:10:34 +0300 |
parents | 238e15580926 |
children | 8d5de4eb9c2c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/AsyncQueue.cs Fri Apr 22 13:10:34 2016 +0300 @@ -0,0 +1,631 @@ +using System.Threading; +using System.Collections.Generic; +using System; +using System.Collections; +using System.Diagnostics; + +namespace Implab.Parallels { + public class AsyncQueue<T> : IEnumerable<T> { + class Chunk { + public Chunk next; + + int m_low; + int m_hi; + int m_alloc; + readonly int m_size; + readonly T[] m_data; + + public Chunk(int size) { + m_size = size; + m_data = new T[size]; + } + + public Chunk(int size, T value) { + m_size = size; + m_hi = 1; + m_alloc = 1; + m_data = new T[size]; + m_data[0] = value; + } + + public Chunk(int size, T[] data, int offset, int length, int alloc) { + m_size = size; + m_hi = length; + m_alloc = alloc; + m_data = new T[size]; + Array.Copy(data, offset, m_data, 0, length); + } + + public int Low { + get { return m_low; } + } + + public int Hi { + get { return m_hi; } + } + + public int Size { + get { return m_size; } + } + + public bool TryEnqueue(T value, out bool extend) { + var alloc = Interlocked.Increment(ref m_alloc) - 1; + + if (alloc >= m_size) { + extend = alloc == m_size; + return false; + } + + extend = false; + m_data[alloc] = value; + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { + // spin wait for commit + } + return true; + } + + /// <summary> + /// Prevents from allocating new space in the chunk and waits for all write operations to complete + /// </summary> + public void Commit() { + var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); + + while (m_hi != actual) + Thread.MemoryBarrier(); + } + + public bool TryDequeue(out T value, out bool recycle) { + int low; + do { + low = m_low; + if (low >= m_hi) { + value = default(T); + recycle = (low == m_size); + return false; + } + } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); + + recycle = (low == m_size - 1); + value = m_data[low]; + + return true; + } + + public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { + //int alloc; + //int allocSize; + + var alloc = Interlocked.Add(ref m_alloc, length) - length; + if (alloc > m_size) { + // the chunk is full and someone already + // creating the new one + enqueued = 0; // nothing was added + extend = false; // the caller shouldn't try to extend the queue + return false; // nothing was added + } + + enqueued = Math.Min(m_size - alloc, length); + extend = length > enqueued; + + if (enqueued == 0) + return false; + + + Array.Copy(batch, offset, m_data, alloc, enqueued); + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { + // spin wait for commit + } + + return true; + } + + public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { + int low, hi, batchSize; + + do { + low = m_low; + hi = m_hi; + if (low >= hi) { + dequeued = 0; + recycle = (low == m_size); // recycling could be restarted and we need to signal again + return false; + } + batchSize = Math.Min(hi - low, length); + } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low)); + + recycle = (low == m_size - batchSize); + dequeued = batchSize; + + Array.Copy(m_data, low, buffer, offset, batchSize); + + return true; + } + + public T GetAt(int pos) { + return m_data[pos]; + } + } + + public const int DEFAULT_CHUNK_SIZE = 32; + public const int MAX_CHUNK_SIZE = 262144; + + Chunk m_first; + Chunk m_last; + + /// <summary> + /// Adds the specified value to the queue. + /// </summary> + /// <param name="value">Tha value which will be added to the queue.</param> + public virtual void Enqueue(T value) { + var last = m_last; + // spin wait to the new chunk + bool extend = true; + while (last == null || !last.TryEnqueue(value, out extend)) { + // try to extend queue + if (extend || last == null) { + var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value); + if (EnqueueChunk(last, chunk)) + break; // success! exit! + last = m_last; + } else { + while (last == m_last) { + Thread.MemoryBarrier(); + } + last = m_last; + } + } + } + + /// <summary> + /// Adds the specified data to the queue. + /// </summary> + /// <param name="data">The buffer which contains the data to be enqueued.</param> + /// <param name="offset">The offset of the data in the buffer.</param> + /// <param name="length">The size of the data to read from the buffer.</param> + public virtual void EnqueueRange(T[] data, int offset, int length) { + if (data == null) + throw new ArgumentNullException("data"); + if (length == 0) + return; + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > data.Length) + throw new ArgumentOutOfRangeException("length"); + + var last = m_last; + + bool extend; + int enqueued; + + while (length > 0) { + extend = true; + if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) { + length -= enqueued; + offset += enqueued; + } + + if (extend) { + // there was no enough space in the chunk + // or there was no chunks in the queue + + while (length > 0) { + + var size = Math.Min(length, MAX_CHUNK_SIZE); + + var chunk = new Chunk( + Math.Max(size, DEFAULT_CHUNK_SIZE), + data, + offset, + size, + length // length >= size + ); + + if (!EnqueueChunk(last, chunk)) { + // looks like the queue has been updated then proceed from the beginning + last = m_last; + break; + } + + // we have successfully added the new chunk + last = chunk; + length -= size; + offset += size; + } + } else { + // we don't need to extend the queue, if we successfully enqueued data + if (length == 0) + break; + + // if we need to wait while someone is extending the queue + // spinwait + while (last == m_last) { + Thread.MemoryBarrier(); + } + + last = m_last; + } + } + } + + /// <summary> + /// Tries to retrieve the first element from the queue. + /// </summary> + /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns> + /// <param name="value">The value of the dequeued element.</param> + public bool TryDequeue(out T value) { + var chunk = m_first; + bool recycle; + while (chunk != null) { + + var result = chunk.TryDequeue(out value, out recycle); + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + else + return result; // this chunk is usable and returned actual result + + if (result) // this chunk is waste but the true result is always actual + return true; + + // try again + chunk = m_first; + } + + // the queue is empty + value = default(T); + return false; + } + + /// <summary> + /// Tries to dequeue the specified amount of data from the queue. + /// </summary> + /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns> + /// <param name="buffer">The buffer to which the data will be written.</param> + /// <param name="offset">The offset in the buffer at which the data will be written.</param> + /// <param name="length">The maximum amount of data to be retrieved.</param> + /// <param name="dequeued">The actual amout of the retrieved data.</param> + public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > buffer.Length) + throw new ArgumentOutOfRangeException("length"); + + var chunk = m_first; + bool recycle; + dequeued = 0; + while (chunk != null) { + + int actual; + if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { + offset += actual; + length -= actual; + dequeued += actual; + } + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + else if (actual == 0) + break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue) + + if (length == 0) + return true; + + // we still may dequeue something + // try again + chunk = m_first; + } + + return dequeued != 0; + } + + /// <summary> + /// Tries to dequeue all remaining data in the first chunk. + /// </summary> + /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> + /// <param name="buffer">The buffer to which the data will be written.</param> + /// <param name="offset">The offset in the buffer at which the data will be written.</param> + /// <param name="length">Tha maximum amount of the data to be dequeued.</param> + /// <param name="dequeued">The actual amount of the dequeued data.</param> + public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { + if (buffer == null) + throw new ArgumentNullException("buffer"); + if (offset < 0) + throw new ArgumentOutOfRangeException("offset"); + if (length < 1 || offset + length > buffer.Length) + throw new ArgumentOutOfRangeException("length"); + + var chunk = m_first; + bool recycle; + dequeued = 0; + + while (chunk != null) { + + int actual; + if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) { + dequeued = actual; + } + + if (recycle) // this chunk is waste + RecycleFirstChunk(chunk); + + // if we have dequeued any data, then return + if (dequeued != 0) + return true; + + // we still may dequeue something + // try again + chunk = m_first; + } + + return false; + } + + bool EnqueueChunk(Chunk last, Chunk chunk) { + if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) + return false; + + if (last != null) + last.next = chunk; + else { + m_first = chunk; + } + return true; + } + + void RecycleFirstChunk(Chunk first) { + var next = first.next; + + if (first != Interlocked.CompareExchange(ref m_first, next, first)) + return; + + if (next == null) { + + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + /*while (first.next == null) + Thread.MemoryBarrier();*/ + + // race + // someone already updated the tail, restore the pointer to the queue head + m_first = first; + } + // the tail is updated + } + + // we need to update the head + //Interlocked.CompareExchange(ref m_first, next, first); + // if the head is already updated then give up + //return; + + } + + public void Clear() { + // start the new queue + var chunk = new Chunk(DEFAULT_CHUNK_SIZE); + + do { + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) // nothing to clear + return; + + if (first == null || (first.next == null && first != last)) // inconcistency + continue; + + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + + m_last = chunk; + + return; + + } while(true); + } + + public T[] Drain() { + // start the new queue + var chunk = new Chunk(DEFAULT_CHUNK_SIZE); + + do { + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) + return new T[0]; + + if (first == null || (first.next == null && first != last)) + continue; + + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + + last = Interlocked.Exchange(ref m_last, chunk); + + return ReadChunks(first, last); + + } while(true); + } + + static T[] ReadChunks(Chunk chunk, object last) { + var result = new List<T>(); + var buffer = new T[DEFAULT_CHUNK_SIZE]; + int actual; + bool recycle; + while (chunk != null) { + // ensure all write operations on the chunk are complete + chunk.Commit(); + + // we need to read the chunk using this way + // since some client still may completing the dequeue + // operation, such clients most likely won't get results + while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) + result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); + + if (chunk == last) { + chunk = null; + } else { + while (chunk.next == null) + Thread.MemoryBarrier(); + chunk = chunk.next; + } + } + + return result.ToArray(); + } + + struct ArraySegmentCollection : ICollection<T> { + readonly T[] m_data; + readonly int m_offset; + readonly int m_length; + + public ArraySegmentCollection(T[] data, int offset, int length) { + m_data = data; + m_offset = offset; + m_length = length; + } + + #region ICollection implementation + + public void Add(T item) { + throw new NotSupportedException(); + } + + public void Clear() { + throw new NotSupportedException(); + } + + public bool Contains(T item) { + return false; + } + + public void CopyTo(T[] array, int arrayIndex) { + Array.Copy(m_data,m_offset,array,arrayIndex, m_length); + } + + public bool Remove(T item) { + throw new NotSupportedException(); + } + + public int Count { + get { + return m_length; + } + } + + public bool IsReadOnly { + get { + return true; + } + } + + #endregion + + #region IEnumerable implementation + + public IEnumerator<T> GetEnumerator() { + for (int i = m_offset; i < m_length + m_offset; i++) + yield return m_data[i]; + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } + + #region IEnumerable implementation + + class Enumerator : IEnumerator<T> { + Chunk m_current; + int m_pos = -1; + + public Enumerator(Chunk fisrt) { + m_current = fisrt; + } + + #region IEnumerator implementation + + public bool MoveNext() { + if (m_current == null) + return false; + + if (m_pos == -1) + m_pos = m_current.Low; + else + m_pos++; + + if (m_pos == m_current.Hi) { + + m_current = m_pos == m_current.Size ? m_current.next : null; + + m_pos = 0; + + if (m_current == null) + return false; + } + + return true; + } + + public void Reset() { + throw new NotSupportedException(); + } + + object IEnumerator.Current { + get { + return Current; + } + } + + #endregion + + #region IDisposable implementation + + public void Dispose() { + } + + #endregion + + #region IEnumerator implementation + + public T Current { + get { + if (m_pos == -1 || m_current == null) + throw new InvalidOperationException(); + return m_current.GetAt(m_pos); + } + } + + #endregion + } + + public IEnumerator<T> GetEnumerator() { + return new Enumerator(m_first); + } + + #endregion + + #region IEnumerable implementation + + IEnumerator IEnumerable.GetEnumerator() { + return GetEnumerator(); + } + + #endregion + } +}