Mercurial > pub > ImplabNet
diff Implab/Parallels/AsyncQueue.cs @ 120:f1b897999260 v2
improved asyncpool usability
working on batch operations on asyncqueue
author | cin |
---|---|
date | Mon, 12 Jan 2015 05:19:52 +0300 |
parents | 2573b562e328 |
children | 62d2f1e98c4e |
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs Sun Jan 11 19:13:02 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Mon Jan 12 05:19:52 2015 +0300 @@ -36,19 +36,14 @@ } public bool TryEnqueue(T value,out bool extend) { - extend = false; - int alloc; - do { - alloc = m_alloc; - if (alloc > m_size) - return false; - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); + var alloc = Interlocked.Increment(ref m_alloc) - 1; - if (alloc == m_size) { - extend = true; + if (alloc >= m_size) { + extend = alloc == m_size; return false; } - + + extend = false; m_data[alloc] = value; while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { @@ -74,6 +69,38 @@ return true; } + public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { + int alloc; + int allocSize; + + do { + alloc = m_alloc; + + if (alloc > m_size) { + enqueued = 0; + extend = false; + return false; + } + + allocSize = Math.Min(m_size - m_alloc, length); + } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); + + if (alloc == m_size) { + enqueued = 0; + extend = true; + return false; + } + + Array.Copy(batch, offset, m_data, alloc, allocSize); + enqueued = allocSize; + extend = false; + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { + // spin wait for commit + } + return true; + } + public T GetAt(int pos) { return m_data[pos]; }