Mercurial > pub > ImplabNet
changeset 122:0c8685c8b56b v2
minor fixes and improvements of AsyncQueue, additional tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 22:20:45 +0300 |
parents | 62d2f1e98c4e |
children | f4d6ea6969cc |
files | Implab.Test/AsyncTests.cs Implab/Parallels/AsyncQueue.cs MonoPlay/Program.cs |
diffstat | 3 files changed, 96 insertions(+), 31 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Mon Jan 12 18:19:41 2015 +0300 +++ b/Implab.Test/AsyncTests.cs Mon Jan 12 22:20:45 2015 +0300 @@ -430,6 +430,82 @@ } [TestMethod] + public void AsyncQueueChunkDequeueTest() { + var queue = new AsyncQueue<int>(); + + const int wBatch = 31; + const int wCount = 200000; + const int total = wBatch * wCount * 3; + const int summ = wBatch * wCount * 6; + + int r1 = 0, r2 = 0; + const int rBatch = 1024; + int read = 0; + + var t1 = Environment.TickCount; + + AsyncPool.RunThread( + () => { + var buffer = new int[wBatch]; + for(int i = 0; i<wBatch; i++) + buffer[i] = 1; + + for(int i =0; i < wCount; i++) + queue.EnqueueRange(buffer,0,wBatch); + Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[wBatch]; + for(int i = 0; i<wBatch; i++) + buffer[i] = 2; + + for(int i =0; i < wCount; i++) + queue.EnqueueRange(buffer,0,wBatch); + Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[wBatch]; + for(int i = 0; i<wBatch; i++) + buffer[i] = 3; + + for(int i =0; i < wCount; i++) + queue.EnqueueRange(buffer,0,wBatch); + Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); + }, + () => { + var buffer = new int[rBatch]; + int count = 1; + double avgchunk = 0; + while(read < total) { + int actual; + if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { + for(int i=0; i< actual; i++) + r2 += buffer[i]; + Interlocked.Add(ref read, actual); + avgchunk = avgchunk*(count-1)/count + actual/(double)count; + count ++; + } + } + + Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); + } + ) + .Combine() + .Join(); + + Assert.AreEqual(summ , r1 + r2); + + Console.WriteLine( + "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", + Environment.TickCount - t1, + r1, + r2, + r1 + r2, + total + ); + } + + [TestMethod] public void ParallelMapTest() { const int count = 100000;
--- a/Implab/Parallels/AsyncQueue.cs Mon Jan 12 18:19:41 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Mon Jan 12 22:20:45 2015 +0300 @@ -78,42 +78,31 @@ } public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { - int alloc; - int allocSize; - - // in case the batch size is larger than a free space in chunk - // tailGap is used to over allocate the space in the chunk to - // get exclusive permission on creation of the next one. - int tailGap = 0; - - do { - alloc = m_alloc; + //int alloc; + //int allocSize; - if (alloc > m_size) { - // the chunk is full and someone already - // creating the new one - enqueued = 0; // nothing was added - extend = false; // the caller shouldn't try to extend the queue - return false; // nothing was added - } + var alloc = Interlocked.Add(ref m_alloc, length) - length; + if (alloc > m_size) { + // the chunk is full and someone already + // creating the new one + enqueued = 0; // nothing was added + extend = false; // the caller shouldn't try to extend the queue + return false; // nothing was added + } - allocSize = Math.Min(m_size - alloc, length); - if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch - tailGap = 1; // overallocate space to get exclusive permission to extend queue - } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc)); - - extend = tailGap != 0; - enqueued = allocSize; + enqueued = Math.Min(m_size - alloc, length); + extend = length > enqueued; - // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1 - if (alloc == m_size) + if (enqueued == 0) return false; - Array.Copy(batch, offset, m_data, alloc, allocSize); - while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { + Array.Copy(batch, offset, m_data, alloc, enqueued); + + while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) { // spin wait for commit } + return true; } @@ -169,7 +158,7 @@ if (extend || last == null) { var chunk = new Chunk(m_chunkSize, value); if (EnqueueChunk(last, chunk)) - break; + break; // success! exit! last = m_last; } else { while (last == m_last) { @@ -326,7 +315,7 @@ /// Tries to dequeue all remaining data in the first chunk. /// </summary> /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> - /// <param name="buffer">The buffer to which data will be written.</param> + /// <param name="buffer">The buffer to which the data will be written.</param> /// <param name="offset">The offset in the buffer at which the data will be written.</param> /// <param name="length">Tha maximum amount of the data to be dequeued.</param> /// <param name="dequeued">The actual amount of the dequeued data.</param>
--- a/MonoPlay/Program.cs Mon Jan 12 18:19:41 2015 +0300 +++ b/MonoPlay/Program.cs Mon Jan 12 22:20:45 2015 +0300 @@ -54,7 +54,7 @@ .Combine() .Join(); - Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}", Environment.TickCount - t1, res1, res2, res1 + res2); + Console.WriteLine("done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", Environment.TickCount - t1, res1, res2, res1 + res2, count*2); var t2 = Environment.TickCount; Console.WriteLine("MTQueue: {0} ms", t2 - t1);