annotate Implab/Parallels/AsyncQueue.cs @ 123:f4d6ea6969cc v2

async queue improvements
author cin
date Tue, 13 Jan 2015 01:42:38 +0300
parents 0c8685c8b56b
children a336cb13c6a9
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
1 using System.Threading;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
2 using System.Collections.Generic;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
3 using System;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
4 using System.Collections;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
5
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
6 namespace Implab.Parallels {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
7 public class AsyncQueue<T> : IEnumerable<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
8 class Chunk {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
9 public Chunk next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
10
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
11 int m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
12 int m_hi;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
13 int m_alloc;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
14 readonly int m_size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
15 readonly T[] m_data;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
16
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
17 public Chunk(int size) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
18 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
19 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
20 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
21
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
22 public Chunk(int size, T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
23 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
24 m_hi = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
25 m_alloc = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
26 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
27 m_data[0] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
28 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
29
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
30 public Chunk(int size, T[] data, int offset, int length, int alloc) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
31 m_size = size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
32 m_hi = length;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
33 m_alloc = alloc;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
34 m_data = new T[size];
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
35 Array.Copy(data, offset, m_data, 0, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
36 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
37
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
38 public int Low {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
39 get { return m_low; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
40 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
41
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
42 public int Hi {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
43 get { return m_hi; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
44 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
45
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
46 public bool TryEnqueue(T value, out bool extend) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
47 var alloc = Interlocked.Increment(ref m_alloc) - 1;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
48
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
49 if (alloc >= m_size) {
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
50 extend = alloc == m_size;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
51 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
52 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
53
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
54 extend = false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
55 m_data[alloc] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
56
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
57 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
58 // spin wait for commit
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
59 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
60 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
61 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
62
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
63 public bool TryDequeue(out T value, out bool recycle) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
64 int low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
65 do {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
66 low = m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
67 if (low >= m_hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
68 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
69 recycle = (low == m_size);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
70 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
71 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
72 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
73
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
74 recycle = (low == m_size - 1);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
75 value = m_data[low];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
76
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
77 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
78 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
79
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
80 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
81 //int alloc;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
82 //int allocSize;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
83
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
84 var alloc = Interlocked.Add(ref m_alloc, length) - length;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
85 if (alloc > m_size) {
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
86 // the chunk is full and someone already
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
87 // creating the new one
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
88 enqueued = 0; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
89 extend = false; // the caller shouldn't try to extend the queue
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
90 return false; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
91 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
92
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
93 enqueued = Math.Min(m_size - alloc, length);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
94 extend = length > enqueued;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
95
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
96 if (enqueued == 0)
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
97 return false;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
98
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
99
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
100 Array.Copy(batch, offset, m_data, alloc, enqueued);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
101
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
102 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
103 // spin wait for commit
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
104 }
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
105
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
106 return true;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
107 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
108
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
109 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
110 int low, hi, batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
111
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
112 do {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
113 low = m_low;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
114 hi = m_hi;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
115 if (low >= hi) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
116 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
117 recycle = (low == m_size); // recycling could be restarted and we need to signal again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
118 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
119 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
120 batchSize = Math.Min(hi - low, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
121 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
122
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
123 recycle = (low == m_size - batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
124 dequeued = batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
125
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
126 Array.Copy(m_data, low, buffer, offset, batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
127
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
128 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
129 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
130
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
131 public T GetAt(int pos) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
132 return m_data[pos];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
133 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
134 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
135
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
136 public const int DEFAULT_CHUNK_SIZE = 32;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
137 public const int MAX_CHUNK_SIZE = 262144;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
138
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
139 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
140
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
141 Chunk m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
142 Chunk m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
143
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
144 public AsyncQueue() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
145 m_last = m_first = new Chunk(m_chunkSize);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
146 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
147
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
148 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
149 /// Adds the specified value to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
150 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
151 /// <param name="value">Tha value which will be added to the queue.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
152 public void Enqueue(T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
153 var last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
154 // spin wait to the new chunk
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
155 bool extend = true;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
156 while (last == null || !last.TryEnqueue(value, out extend)) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
157 // try to extend queue
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
158 if (extend || last == null) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
159 var chunk = new Chunk(m_chunkSize, value);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
160 if (EnqueueChunk(last, chunk))
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
161 break; // success! exit!
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
162 last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
163 } else {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
164 while (last == m_last) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
165 Thread.MemoryBarrier();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
166 }
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
167 last = m_last;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
168 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
169 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
170 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
171
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
172 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
173 /// Adds the specified data to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
174 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
175 /// <param name="data">The buffer which contains the data to be enqueued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
176 /// <param name="offset">The offset of the data in the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
177 /// <param name="length">The size of the data to read from the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
178 public void EnqueueRange(T[] data, int offset, int length) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
179 if (data == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
180 throw new ArgumentNullException("data");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
181 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
182 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
183 if (length < 1 || offset + length > data.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
184 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
185
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
186 var last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
187
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
188 bool extend;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
189 int enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
190
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
191 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
192 extend = true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
193 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
194 length -= enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
195 offset += enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
196 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
197
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
198 if (extend) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
199 // there was no enough space in the chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
200 // or there was no chunks in the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
201
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
202 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
203
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
204 var size = Math.Min(length, MAX_CHUNK_SIZE);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
205
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
206 var chunk = new Chunk(
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
207 Math.Max(size, m_chunkSize),
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
208 data,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
209 offset,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
210 size,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
211 length // length >= size
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
212 );
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
213
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
214 if (!EnqueueChunk(last, chunk)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
215 // looks like the queue has been updated then proceed from the beginning
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
216 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
217 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
218 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
219
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
220 // we have successfully added the new chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
221 last = chunk;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
222 length -= size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
223 offset += size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
224 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
225 } else {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
226 // we don't need to extend the queue, if we successfully enqueued data
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
227 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
228 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
229
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
230 // if we need to wait while someone is extending the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
231 // spinwait
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
232 while (last == m_last) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
233 Thread.MemoryBarrier();
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
234 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
235
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
236 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
237 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
238 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
239 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
240
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
241 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
242 /// Tries to retrieve the first element from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
243 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
245 /// <param name="value">The value of the dequeued element.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
246 public bool TryDequeue(out T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
247 var chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
248 bool recycle;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
249 while (chunk != null) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
250
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
251 var result = chunk.TryDequeue(out value, out recycle);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
252
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
253 if (recycle) // this chunk is waste
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
254 RecycleFirstChunk(chunk);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
255 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
256 return result; // this chunk is usable and returned actual result
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
257
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
258 if (result) // this chunk is waste but the true result is always actual
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
259 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
260
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
261 // try again
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
262 chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
263 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
264
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
265 // the queue is empty
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
266 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
267 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
268 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
269
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
270 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
271 /// Tries to dequeue the specified amount of data from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
272 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
273 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
274 /// <param name="buffer">The buffer to which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
275 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
276 /// <param name="length">The maximum amount of data to be retrieved.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
277 /// <param name="dequeued">The actual amout of the retrieved data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
278 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
279 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
280 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
281 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
282 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
283 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
284 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
285
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
286 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
287 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
288 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
289 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
290
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
291 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
292 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
293 offset += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
294 length -= actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
295 dequeued += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
296 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
297
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
298 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
299 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
300 else if (actual == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
301 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
302
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
303 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
304 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
305
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
306 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
307 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
308 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
309 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
310
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
311 return dequeued != 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
312 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
313
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
314 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
315 /// Tries to dequeue all remaining data in the first chunk.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
316 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
317 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
318 /// <param name="buffer">The buffer to which the data will be written.</param>
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
319 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
320 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
321 /// <param name="dequeued">The actual amount of the dequeued data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
322 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
323 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
324 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
325 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
326 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
327 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
328 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
329
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
330 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
331 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
332 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
333
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
334 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
335
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
336 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
337 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
338 dequeued = actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
339 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
340
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
341 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
342 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
343
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
344 // if we have dequeued any data, then return
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
345 if (dequeued != 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
346 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
347
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
348 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
349 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
350 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
351 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
352
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
353 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
354 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
355
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
356 bool EnqueueChunk(Chunk last, Chunk chunk) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
357 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
358 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
359
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
360 if (last != null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
361 last.next = chunk;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
362 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
363 m_first = chunk;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
364 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
365 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
366
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
367 void RecycleFirstChunk(Chunk first) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
368 var next = first.next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
369
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
370 if (next == null) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
371 // looks like this is the last chunk
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
372 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
373 // race
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
374 // maybe someone already recycled this chunk
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
375 // or a new chunk has been appedned to the queue
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
376
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
377 return; // give up
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
378 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
379 // the tail is updated
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
380 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
381
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
382 // we need to update the head
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
383 Interlocked.CompareExchange(ref m_first, next, first);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
384 // if the head is already updated then give up
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
385 return;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
386
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
387 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
388
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
389 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
390 // start the new queue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
391 var t = new Chunk(m_chunkSize);
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
392 Thread.MemoryBarrier();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
393 m_last = t;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
394 Thread.MemoryBarrier();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
395
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
396 // make the new queue available to the readers, and stop the old one
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
397 m_first = t;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
398 Thread.MemoryBarrier();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
399 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
400
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
401 public T[] Drain() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
402 // start the new queue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
403 var t = new Chunk(m_chunkSize);
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
404 Thread.MemoryBarrier();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
405 m_last = t;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
406 Thread.MemoryBarrier();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
407
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
408 // make the new queue available to the readers, and stop the old one
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
409 Chunk first;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
410
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
411 do {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
412 first = m_first;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
413 } while(first != Interlocked.CompareExchange(ref m_first
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
414 Thread.MemoryBarrier();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
415
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
416
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
417 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
418
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
419 T[] ReadChunks(Chunk chunk) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
420 var result = new List<T>();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
421 var buffer = new T[m_chunkSize];
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
422 int actual;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
423 bool recycle;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
424 while (chunk != null) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
425 // we need to read the chunk using this way
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
426 // since some client still may completing the dequeue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
427 // operation, such clients most likely won't get results
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
428 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
429 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
430
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
431 chunk = chunk.next;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
432 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
433
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
434 return result.ToArray();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
435 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
436
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
437 struct ArraySegmentCollection : ICollection<T> {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
438 readonly T[] m_data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
439 readonly int m_offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
440 readonly int m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
441
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
442 public ArraySegmentCollection(T[] data, int offset, int length) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
443 m_data = data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
444 m_offset = offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
445 m_length = length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
446 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
447
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
448 #region ICollection implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
449
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
450 public void Add(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
451 throw new InvalidOperationException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
452 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
453
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
454 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
455 throw new InvalidOperationException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
456 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
457
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
458 public bool Contains(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
459 return false;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
460 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
461
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
462 public void CopyTo(T[] array, int arrayIndex) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
463 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
464 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
465
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
466 public bool Remove(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
467 throw new NotImplementedException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
468 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
469
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
470 public int Count {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
471 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
472 return m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
473 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
474 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
475
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
476 public bool IsReadOnly {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
477 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
478 return true;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
479 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
480 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
481
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
482 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
483
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
484 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
485
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
486 public IEnumerator<T> GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
487 for (int i = m_offset; i < m_length + m_offset; i++)
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
488 yield return m_data[i];
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
489 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
490
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
491 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
492
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
493 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
494
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
495 IEnumerator IEnumerable.GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
496 return GetEnumerator();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
497 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
498
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
499 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
500 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
501
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
502 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
503
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
504 class Enumerator : IEnumerator<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
505 Chunk m_current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
506 int m_pos = -1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
507
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
508 public Enumerator(Chunk fisrt) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
509 m_current = fisrt;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
510 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
511
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
512 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
513
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
514 public bool MoveNext() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
515 if (m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
516 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
517
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
518 if (m_pos == -1)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
519 m_pos = m_current.Low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
520 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
521 m_pos++;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
522 if (m_pos == m_current.Hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
523 m_pos = 0;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
524 m_current = m_current.next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
525 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
526
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
527 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
528 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
529
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
530 public void Reset() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
531 throw new NotSupportedException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
532 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
533
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
534 object IEnumerator.Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
535 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
536 return Current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
537 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
538 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
539
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
540 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
541
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
542 #region IDisposable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
543
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
544 public void Dispose() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
545 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
546
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
547 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
548
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
549 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
550
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
551 public T Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
552 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
553 if (m_pos == -1 || m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
554 throw new InvalidOperationException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
555 return m_current.GetAt(m_pos);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
556 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
557 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
558
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
559 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
560 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
561
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
562 public IEnumerator<T> GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
563 return new Enumerator(m_first);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
564 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
565
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
566 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
567
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
568 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
569
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
570 IEnumerator IEnumerable.GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
571 return GetEnumerator();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
572 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
573
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
574 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
575 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
576 }