119
|
1 using System.Threading;
|
|
2 using System.Collections.Generic;
|
|
3 using System;
|
|
4 using System.Collections;
|
|
5
|
|
6 namespace Implab.Parallels {
|
|
7 public class AsyncQueue<T> : IEnumerable<T> {
|
|
8 class Chunk {
|
|
9 public Chunk next;
|
|
10
|
|
11 int m_low;
|
|
12 int m_hi;
|
|
13 int m_alloc;
|
|
14 readonly int m_size;
|
|
15 readonly T[] m_data;
|
|
16
|
|
17 public Chunk(int size) {
|
|
18 m_size = size;
|
|
19 m_data = new T[size];
|
|
20 }
|
|
21
|
|
22 public Chunk(int size, T value) {
|
|
23 m_size = size;
|
|
24 m_hi = 1;
|
|
25 m_alloc = 1;
|
|
26 m_data = new T[size];
|
|
27 m_data[0] = value;
|
|
28 }
|
|
29
|
|
30 public int Low {
|
|
31 get { return m_low; }
|
|
32 }
|
|
33
|
|
34 public int Hi {
|
|
35 get { return m_hi; }
|
|
36 }
|
|
37
|
|
38 public bool TryEnqueue(T value,out bool extend) {
|
|
39 extend = false;
|
|
40 int alloc;
|
|
41 do {
|
|
42 alloc = m_alloc;
|
|
43 if (alloc > m_size)
|
|
44 return false;
|
|
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
|
|
46
|
|
47 if (alloc == m_size) {
|
|
48 extend = true;
|
|
49 return false;
|
|
50 }
|
|
51
|
|
52 m_data[alloc] = value;
|
|
53
|
|
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
|
|
55 // spin wait for commit
|
|
56 }
|
|
57 return true;
|
|
58 }
|
|
59
|
|
60 public bool TryDequeue(out T value,out bool recycle) {
|
|
61 int low;
|
|
62 do {
|
|
63 low = m_low;
|
|
64 if (low >= m_hi) {
|
|
65 value = default(T);
|
|
66 recycle = (low == m_size);
|
|
67 return false;
|
|
68 }
|
|
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
|
|
70
|
|
71 recycle = (low == m_size - 1);
|
|
72 value = m_data[low];
|
|
73
|
|
74 return true;
|
|
75 }
|
|
76
|
|
77 public T GetAt(int pos) {
|
|
78 return m_data[pos];
|
|
79 }
|
|
80 }
|
|
81
|
|
82 public const int DEFAULT_CHUNK_SIZE = 32;
|
|
83
|
|
84 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
|
|
85
|
|
86 Chunk m_first;
|
|
87 Chunk m_last;
|
|
88
|
|
89 public AsyncQueue() {
|
|
90 m_last = m_first = new Chunk(m_chunkSize);
|
|
91 }
|
|
92
|
|
93 public void Enqueue(T value) {
|
|
94 var last = m_last;
|
|
95 // spin wait to the new chunk
|
|
96 bool extend = true;
|
|
97 while(last == null || !last.TryEnqueue(value, out extend)) {
|
|
98 // try to extend queue
|
|
99 if (extend || last == null) {
|
|
100 var chunk = new Chunk(m_chunkSize, value);
|
|
101 if (EnqueueChunk(last, chunk))
|
|
102 break;
|
|
103 last = m_last;
|
|
104 } else {
|
|
105 while (last != m_last) {
|
|
106 Thread.MemoryBarrier();
|
|
107 last = m_last;
|
|
108 }
|
|
109 }
|
|
110 }
|
|
111 }
|
|
112
|
|
113 public bool TryDequeue(out T value) {
|
|
114 var chunk = m_first;
|
|
115 bool recycle;
|
|
116 while (chunk != null) {
|
|
117
|
|
118 var result = chunk.TryDequeue(out value, out recycle);
|
|
119
|
|
120 if (recycle) // this chunk is waste
|
|
121 RecycleFirstChunk(chunk);
|
|
122 else
|
|
123 return result; // this chunk is usable and returned actual result
|
|
124
|
|
125 if (result) // this chunk is waste but the true result is always actual
|
|
126 return true;
|
|
127
|
|
128 // try again
|
|
129 chunk = m_first;
|
|
130 }
|
|
131
|
|
132 // the queue is empty
|
|
133 value = default(T);
|
|
134 return false;
|
|
135 }
|
|
136
|
|
137 bool EnqueueChunk(Chunk last, Chunk chunk) {
|
|
138 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
|
|
139 return false;
|
|
140
|
|
141 if (last != null)
|
|
142 last.next = chunk;
|
|
143 else
|
|
144 m_first = chunk;
|
|
145 return true;
|
|
146 }
|
|
147
|
|
148 void RecycleFirstChunk(Chunk first) {
|
|
149 var next = first.next;
|
|
150
|
|
151 if (next == null) {
|
|
152 // looks like this is the last chunk
|
|
153 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
|
|
154 // race
|
|
155 // maybe someone already recycled this chunk
|
|
156 // or a new chunk has been appedned to the queue
|
|
157
|
|
158 return; // give up
|
|
159 }
|
|
160 // the tail is updated
|
|
161 }
|
|
162
|
|
163 // we need to update the head
|
|
164 Interlocked.CompareExchange(ref m_first, next, first);
|
|
165 // if the head is already updated then give up
|
|
166 return;
|
|
167
|
|
168 }
|
|
169
|
|
170 #region IEnumerable implementation
|
|
171
|
|
172 class Enumerator : IEnumerator<T> {
|
|
173 Chunk m_current;
|
|
174 int m_pos = -1;
|
|
175
|
|
176 public Enumerator(Chunk fisrt) {
|
|
177 m_current = fisrt;
|
|
178 }
|
|
179
|
|
180 #region IEnumerator implementation
|
|
181
|
|
182 public bool MoveNext() {
|
|
183 if (m_current == null)
|
|
184 return false;
|
|
185
|
|
186 if (m_pos == -1)
|
|
187 m_pos = m_current.Low;
|
|
188 else
|
|
189 m_pos++;
|
|
190 if (m_pos == m_current.Hi) {
|
|
191 m_pos = 0;
|
|
192 m_current = m_current.next;
|
|
193 }
|
|
194
|
|
195 return true;
|
|
196 }
|
|
197
|
|
198 public void Reset() {
|
|
199 throw new NotSupportedException();
|
|
200 }
|
|
201
|
|
202 object IEnumerator.Current {
|
|
203 get {
|
|
204 return Current;
|
|
205 }
|
|
206 }
|
|
207
|
|
208 #endregion
|
|
209
|
|
210 #region IDisposable implementation
|
|
211
|
|
212 public void Dispose() {
|
|
213 }
|
|
214
|
|
215 #endregion
|
|
216
|
|
217 #region IEnumerator implementation
|
|
218
|
|
219 public T Current {
|
|
220 get {
|
|
221 if (m_pos == -1 || m_current == null)
|
|
222 throw new InvalidOperationException();
|
|
223 return m_current.GetAt(m_pos);
|
|
224 }
|
|
225 }
|
|
226
|
|
227 #endregion
|
|
228 }
|
|
229
|
|
230 public IEnumerator<T> GetEnumerator() {
|
|
231 return new Enumerator(m_first);
|
|
232 }
|
|
233
|
|
234 #endregion
|
|
235
|
|
236 #region IEnumerable implementation
|
|
237
|
|
238 IEnumerator IEnumerable.GetEnumerator() {
|
|
239 return GetEnumerator();
|
|
240 }
|
|
241
|
|
242 #endregion
|
|
243 }
|
|
244 }
|