Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 124:a336cb13c6a9 v2
major update, added Drain mathod to AsyncQueue class
author | cin |
---|---|
date | Thu, 15 Jan 2015 02:43:14 +0300 |
parents | f4d6ea6969cc |
children | f803565868a4 |
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs Tue Jan 13 01:42:38 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Thu Jan 15 02:43:14 2015 +0300 @@ -2,6 +2,7 @@ using System.Collections.Generic; using System; using System.Collections; +using System.Diagnostics; namespace Implab.Parallels { public class AsyncQueue<T> : IEnumerable<T> { @@ -60,6 +61,16 @@ return true; } + /// <summary> + /// Prevents from allocating new space in the chunk and waits for all write operations to complete + /// </summary> + public void Commit() { + var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size); + + while (m_hi != actual) + Thread.MemoryBarrier(); + } + public bool TryDequeue(out T value, out bool recycle) { int low; do { @@ -359,76 +370,114 @@ if (last != null) last.next = chunk; - else + else { m_first = chunk; + } return true; } void RecycleFirstChunk(Chunk first) { var next = first.next; + if (first != Interlocked.CompareExchange(ref m_first, next, first)) + return; + if (next == null) { - // looks like this is the last chunk + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + /*while (first.next == null) + Thread.MemoryBarrier();*/ + // race - // maybe someone already recycled this chunk - // or a new chunk has been appedned to the queue - - return; // give up + // someone already updated the tail, restore the pointer to the queue head + m_first = first; } // the tail is updated } // we need to update the head - Interlocked.CompareExchange(ref m_first, next, first); + //Interlocked.CompareExchange(ref m_first, next, first); // if the head is already updated then give up - return; + //return; } public void Clear() { // start the new queue - var t = new Chunk(m_chunkSize); - Thread.MemoryBarrier(); - m_last = t; - Thread.MemoryBarrier(); + var chunk = new Chunk(m_chunkSize); + + do { + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) // nothing to clear + return; - // make the new queue available to the readers, and stop the old one - m_first = t; - Thread.MemoryBarrier(); + if (first == null || (first.next == null && first != last)) // inconcistency + continue; + + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + + m_last = chunk; + + return; + + } while(true); } public T[] Drain() { // start the new queue - var t = new Chunk(m_chunkSize); - Thread.MemoryBarrier(); - m_last = t; - Thread.MemoryBarrier(); - - // make the new queue available to the readers, and stop the old one - Chunk first; + var chunk = new Chunk(m_chunkSize); do { - first = m_first; - } while(first != Interlocked.CompareExchange(ref m_first - Thread.MemoryBarrier(); + Thread.MemoryBarrier(); + var first = m_first; + var last = m_last; + + if (last == null) + return new T[0]; + + if (first == null || (first.next == null && first != last)) + continue; + // here we will create inconsistency which will force others to spin + // and prevent from fetching. chunk.next = null + if (first != Interlocked.CompareExchange(ref m_first, chunk, first)) + continue;// inconsistent + last = Interlocked.Exchange(ref m_last, chunk); + + return ReadChunks(first, last); + + } while(true); } - T[] ReadChunks(Chunk chunk) { + T[] ReadChunks(Chunk chunk, object last) { var result = new List<T>(); var buffer = new T[m_chunkSize]; int actual; bool recycle; while (chunk != null) { + // ensure all write operations on the chunk are complete + chunk.Commit(); + // we need to read the chunk using this way // since some client still may completing the dequeue // operation, such clients most likely won't get results while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle)) result.AddRange(new ArraySegmentCollection(buffer, 0, actual)); - chunk = chunk.next; + if (chunk == last) { + chunk = null; + } else { + while (chunk.next == null) + Thread.MemoryBarrier(); + chunk = chunk.next; + } } return result.ToArray();