comparison Implab/Parallels/AsyncQueue.cs @ 119:2573b562e328 v2

Promises rewritten, added improved version of AsyncQueue
author cin
date Sun, 11 Jan 2015 19:13:02 +0300
parents
children f1b897999260
comparison
equal deleted inserted replaced
118:e046a94eecb1 119:2573b562e328
1 using System.Threading;
2 using System.Collections.Generic;
3 using System;
4 using System.Collections;
5
6 namespace Implab.Parallels {
7 public class AsyncQueue<T> : IEnumerable<T> {
8 class Chunk {
9 public Chunk next;
10
11 int m_low;
12 int m_hi;
13 int m_alloc;
14 readonly int m_size;
15 readonly T[] m_data;
16
17 public Chunk(int size) {
18 m_size = size;
19 m_data = new T[size];
20 }
21
22 public Chunk(int size, T value) {
23 m_size = size;
24 m_hi = 1;
25 m_alloc = 1;
26 m_data = new T[size];
27 m_data[0] = value;
28 }
29
30 public int Low {
31 get { return m_low; }
32 }
33
34 public int Hi {
35 get { return m_hi; }
36 }
37
38 public bool TryEnqueue(T value,out bool extend) {
39 extend = false;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
46
47 if (alloc == m_size) {
48 extend = true;
49 return false;
50 }
51
52 m_data[alloc] = value;
53
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
55 // spin wait for commit
56 }
57 return true;
58 }
59
60 public bool TryDequeue(out T value,out bool recycle) {
61 int low;
62 do {
63 low = m_low;
64 if (low >= m_hi) {
65 value = default(T);
66 recycle = (low == m_size);
67 return false;
68 }
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
70
71 recycle = (low == m_size - 1);
72 value = m_data[low];
73
74 return true;
75 }
76
77 public T GetAt(int pos) {
78 return m_data[pos];
79 }
80 }
81
82 public const int DEFAULT_CHUNK_SIZE = 32;
83
84 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE;
85
86 Chunk m_first;
87 Chunk m_last;
88
89 public AsyncQueue() {
90 m_last = m_first = new Chunk(m_chunkSize);
91 }
92
93 public void Enqueue(T value) {
94 var last = m_last;
95 // spin wait to the new chunk
96 bool extend = true;
97 while(last == null || !last.TryEnqueue(value, out extend)) {
98 // try to extend queue
99 if (extend || last == null) {
100 var chunk = new Chunk(m_chunkSize, value);
101 if (EnqueueChunk(last, chunk))
102 break;
103 last = m_last;
104 } else {
105 while (last != m_last) {
106 Thread.MemoryBarrier();
107 last = m_last;
108 }
109 }
110 }
111 }
112
113 public bool TryDequeue(out T value) {
114 var chunk = m_first;
115 bool recycle;
116 while (chunk != null) {
117
118 var result = chunk.TryDequeue(out value, out recycle);
119
120 if (recycle) // this chunk is waste
121 RecycleFirstChunk(chunk);
122 else
123 return result; // this chunk is usable and returned actual result
124
125 if (result) // this chunk is waste but the true result is always actual
126 return true;
127
128 // try again
129 chunk = m_first;
130 }
131
132 // the queue is empty
133 value = default(T);
134 return false;
135 }
136
137 bool EnqueueChunk(Chunk last, Chunk chunk) {
138 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
139 return false;
140
141 if (last != null)
142 last.next = chunk;
143 else
144 m_first = chunk;
145 return true;
146 }
147
148 void RecycleFirstChunk(Chunk first) {
149 var next = first.next;
150
151 if (next == null) {
152 // looks like this is the last chunk
153 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
154 // race
155 // maybe someone already recycled this chunk
156 // or a new chunk has been appedned to the queue
157
158 return; // give up
159 }
160 // the tail is updated
161 }
162
163 // we need to update the head
164 Interlocked.CompareExchange(ref m_first, next, first);
165 // if the head is already updated then give up
166 return;
167
168 }
169
170 #region IEnumerable implementation
171
172 class Enumerator : IEnumerator<T> {
173 Chunk m_current;
174 int m_pos = -1;
175
176 public Enumerator(Chunk fisrt) {
177 m_current = fisrt;
178 }
179
180 #region IEnumerator implementation
181
182 public bool MoveNext() {
183 if (m_current == null)
184 return false;
185
186 if (m_pos == -1)
187 m_pos = m_current.Low;
188 else
189 m_pos++;
190 if (m_pos == m_current.Hi) {
191 m_pos = 0;
192 m_current = m_current.next;
193 }
194
195 return true;
196 }
197
198 public void Reset() {
199 throw new NotSupportedException();
200 }
201
202 object IEnumerator.Current {
203 get {
204 return Current;
205 }
206 }
207
208 #endregion
209
210 #region IDisposable implementation
211
212 public void Dispose() {
213 }
214
215 #endregion
216
217 #region IEnumerator implementation
218
219 public T Current {
220 get {
221 if (m_pos == -1 || m_current == null)
222 throw new InvalidOperationException();
223 return m_current.GetAt(m_pos);
224 }
225 }
226
227 #endregion
228 }
229
230 public IEnumerator<T> GetEnumerator() {
231 return new Enumerator(m_first);
232 }
233
234 #endregion
235
236 #region IEnumerable implementation
237
238 IEnumerator IEnumerable.GetEnumerator() {
239 return GetEnumerator();
240 }
241
242 #endregion
243 }
244 }