comparison Implab/Parallels/AsyncQueue.cs @ 122:0c8685c8b56b v2

minor fixes and improvements of AsyncQueue, additional tests
author cin
date Mon, 12 Jan 2015 22:20:45 +0300
parents 62d2f1e98c4e
children f4d6ea6969cc
comparison
equal deleted inserted replaced
121:62d2f1e98c4e 122:0c8685c8b56b
76 76
77 return true; 77 return true;
78 } 78 }
79 79
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { 80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
81 int alloc; 81 //int alloc;
82 int allocSize; 82 //int allocSize;
83 83
84 // in case the batch size is larger than a free space in chunk 84 var alloc = Interlocked.Add(ref m_alloc, length) - length;
85 // tailGap is used to over allocate the space in the chunk to 85 if (alloc > m_size) {
86 // get exclusive permission on creation of the next one. 86 // the chunk is full and someone already
87 int tailGap = 0; 87 // creating the new one
88 88 enqueued = 0; // nothing was added
89 do { 89 extend = false; // the caller shouldn't try to extend the queue
90 alloc = m_alloc; 90 return false; // nothing was added
91 91 }
92 if (alloc > m_size) { 92
93 // the chunk is full and someone already 93 enqueued = Math.Min(m_size - alloc, length);
94 // creating the new one 94 extend = length > enqueued;
95 enqueued = 0; // nothing was added 95
96 extend = false; // the caller shouldn't try to extend the queue 96 if (enqueued == 0)
97 return false; // nothing was added
98 }
99
100 allocSize = Math.Min(m_size - alloc, length);
101 if (allocSize < length) // the chunk doesn't have enough space to hold the whole batch
102 tailGap = 1; // overallocate space to get exclusive permission to extend queue
103 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize + tailGap, alloc));
104
105 extend = tailGap != 0;
106 enqueued = allocSize;
107
108 // if the chunk was already full then length > 0, allocSize = 0, tailGap = 1
109 if (alloc == m_size)
110 return false; 97 return false;
111 98
112 Array.Copy(batch, offset, m_data, alloc, allocSize); 99
113 100 Array.Copy(batch, offset, m_data, alloc, enqueued);
114 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { 101
102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
115 // spin wait for commit 103 // spin wait for commit
116 } 104 }
105
117 return true; 106 return true;
118 } 107 }
119 108
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) { 109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
121 int low, hi, batchSize; 110 int low, hi, batchSize;
167 while (last == null || !last.TryEnqueue(value, out extend)) { 156 while (last == null || !last.TryEnqueue(value, out extend)) {
168 // try to extend queue 157 // try to extend queue
169 if (extend || last == null) { 158 if (extend || last == null) {
170 var chunk = new Chunk(m_chunkSize, value); 159 var chunk = new Chunk(m_chunkSize, value);
171 if (EnqueueChunk(last, chunk)) 160 if (EnqueueChunk(last, chunk))
172 break; 161 break; // success! exit!
173 last = m_last; 162 last = m_last;
174 } else { 163 } else {
175 while (last == m_last) { 164 while (last == m_last) {
176 Thread.MemoryBarrier(); 165 Thread.MemoryBarrier();
177 } 166 }
324 313
325 /// <summary> 314 /// <summary>
326 /// Tries to dequeue all remaining data in the first chunk. 315 /// Tries to dequeue all remaining data in the first chunk.
327 /// </summary> 316 /// </summary>
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns> 317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
329 /// <param name="buffer">The buffer to which data will be written.</param> 318 /// <param name="buffer">The buffer to which the data will be written.</param>
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param> 319 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param> 320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
332 /// <param name="dequeued">The actual amount of the dequeued data.</param> 321 /// <param name="dequeued">The actual amount of the dequeued data.</param>
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) { 322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
334 if (buffer == null) 323 if (buffer == null)