Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | |
children | f1b897999260 |
comparison
equal
deleted
inserted
replaced
118:e046a94eecb1 | 119:2573b562e328 |
---|---|
1 using System.Threading; | |
2 using System.Collections.Generic; | |
3 using System; | |
4 using System.Collections; | |
5 | |
6 namespace Implab.Parallels { | |
7 public class AsyncQueue<T> : IEnumerable<T> { | |
8 class Chunk { | |
9 public Chunk next; | |
10 | |
11 int m_low; | |
12 int m_hi; | |
13 int m_alloc; | |
14 readonly int m_size; | |
15 readonly T[] m_data; | |
16 | |
17 public Chunk(int size) { | |
18 m_size = size; | |
19 m_data = new T[size]; | |
20 } | |
21 | |
22 public Chunk(int size, T value) { | |
23 m_size = size; | |
24 m_hi = 1; | |
25 m_alloc = 1; | |
26 m_data = new T[size]; | |
27 m_data[0] = value; | |
28 } | |
29 | |
30 public int Low { | |
31 get { return m_low; } | |
32 } | |
33 | |
34 public int Hi { | |
35 get { return m_hi; } | |
36 } | |
37 | |
38 public bool TryEnqueue(T value,out bool extend) { | |
39 extend = false; | |
40 int alloc; | |
41 do { | |
42 alloc = m_alloc; | |
43 if (alloc > m_size) | |
44 return false; | |
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); | |
46 | |
47 if (alloc == m_size) { | |
48 extend = true; | |
49 return false; | |
50 } | |
51 | |
52 m_data[alloc] = value; | |
53 | |
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | |
55 // spin wait for commit | |
56 } | |
57 return true; | |
58 } | |
59 | |
60 public bool TryDequeue(out T value,out bool recycle) { | |
61 int low; | |
62 do { | |
63 low = m_low; | |
64 if (low >= m_hi) { | |
65 value = default(T); | |
66 recycle = (low == m_size); | |
67 return false; | |
68 } | |
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); | |
70 | |
71 recycle = (low == m_size - 1); | |
72 value = m_data[low]; | |
73 | |
74 return true; | |
75 } | |
76 | |
77 public T GetAt(int pos) { | |
78 return m_data[pos]; | |
79 } | |
80 } | |
81 | |
82 public const int DEFAULT_CHUNK_SIZE = 32; | |
83 | |
84 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; | |
85 | |
86 Chunk m_first; | |
87 Chunk m_last; | |
88 | |
89 public AsyncQueue() { | |
90 m_last = m_first = new Chunk(m_chunkSize); | |
91 } | |
92 | |
93 public void Enqueue(T value) { | |
94 var last = m_last; | |
95 // spin wait to the new chunk | |
96 bool extend = true; | |
97 while(last == null || !last.TryEnqueue(value, out extend)) { | |
98 // try to extend queue | |
99 if (extend || last == null) { | |
100 var chunk = new Chunk(m_chunkSize, value); | |
101 if (EnqueueChunk(last, chunk)) | |
102 break; | |
103 last = m_last; | |
104 } else { | |
105 while (last != m_last) { | |
106 Thread.MemoryBarrier(); | |
107 last = m_last; | |
108 } | |
109 } | |
110 } | |
111 } | |
112 | |
113 public bool TryDequeue(out T value) { | |
114 var chunk = m_first; | |
115 bool recycle; | |
116 while (chunk != null) { | |
117 | |
118 var result = chunk.TryDequeue(out value, out recycle); | |
119 | |
120 if (recycle) // this chunk is waste | |
121 RecycleFirstChunk(chunk); | |
122 else | |
123 return result; // this chunk is usable and returned actual result | |
124 | |
125 if (result) // this chunk is waste but the true result is always actual | |
126 return true; | |
127 | |
128 // try again | |
129 chunk = m_first; | |
130 } | |
131 | |
132 // the queue is empty | |
133 value = default(T); | |
134 return false; | |
135 } | |
136 | |
137 bool EnqueueChunk(Chunk last, Chunk chunk) { | |
138 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | |
139 return false; | |
140 | |
141 if (last != null) | |
142 last.next = chunk; | |
143 else | |
144 m_first = chunk; | |
145 return true; | |
146 } | |
147 | |
148 void RecycleFirstChunk(Chunk first) { | |
149 var next = first.next; | |
150 | |
151 if (next == null) { | |
152 // looks like this is the last chunk | |
153 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | |
154 // race | |
155 // maybe someone already recycled this chunk | |
156 // or a new chunk has been appedned to the queue | |
157 | |
158 return; // give up | |
159 } | |
160 // the tail is updated | |
161 } | |
162 | |
163 // we need to update the head | |
164 Interlocked.CompareExchange(ref m_first, next, first); | |
165 // if the head is already updated then give up | |
166 return; | |
167 | |
168 } | |
169 | |
170 #region IEnumerable implementation | |
171 | |
172 class Enumerator : IEnumerator<T> { | |
173 Chunk m_current; | |
174 int m_pos = -1; | |
175 | |
176 public Enumerator(Chunk fisrt) { | |
177 m_current = fisrt; | |
178 } | |
179 | |
180 #region IEnumerator implementation | |
181 | |
182 public bool MoveNext() { | |
183 if (m_current == null) | |
184 return false; | |
185 | |
186 if (m_pos == -1) | |
187 m_pos = m_current.Low; | |
188 else | |
189 m_pos++; | |
190 if (m_pos == m_current.Hi) { | |
191 m_pos = 0; | |
192 m_current = m_current.next; | |
193 } | |
194 | |
195 return true; | |
196 } | |
197 | |
198 public void Reset() { | |
199 throw new NotSupportedException(); | |
200 } | |
201 | |
202 object IEnumerator.Current { | |
203 get { | |
204 return Current; | |
205 } | |
206 } | |
207 | |
208 #endregion | |
209 | |
210 #region IDisposable implementation | |
211 | |
212 public void Dispose() { | |
213 } | |
214 | |
215 #endregion | |
216 | |
217 #region IEnumerator implementation | |
218 | |
219 public T Current { | |
220 get { | |
221 if (m_pos == -1 || m_current == null) | |
222 throw new InvalidOperationException(); | |
223 return m_current.GetAt(m_pos); | |
224 } | |
225 } | |
226 | |
227 #endregion | |
228 } | |
229 | |
230 public IEnumerator<T> GetEnumerator() { | |
231 return new Enumerator(m_first); | |
232 } | |
233 | |
234 #endregion | |
235 | |
236 #region IEnumerable implementation | |
237 | |
238 IEnumerator IEnumerable.GetEnumerator() { | |
239 return GetEnumerator(); | |
240 } | |
241 | |
242 #endregion | |
243 } | |
244 } |