Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
| author | cin | 
|---|---|
| date | Sun, 11 Jan 2015 19:13:02 +0300 | 
| parents | |
| children | f1b897999260 | 
   comparison
  equal
  deleted
  inserted
  replaced
| 118:e046a94eecb1 | 119:2573b562e328 | 
|---|---|
| 1 using System.Threading; | |
| 2 using System.Collections.Generic; | |
| 3 using System; | |
| 4 using System.Collections; | |
| 5 | |
| 6 namespace Implab.Parallels { | |
| 7 public class AsyncQueue<T> : IEnumerable<T> { | |
| 8 class Chunk { | |
| 9 public Chunk next; | |
| 10 | |
| 11 int m_low; | |
| 12 int m_hi; | |
| 13 int m_alloc; | |
| 14 readonly int m_size; | |
| 15 readonly T[] m_data; | |
| 16 | |
| 17 public Chunk(int size) { | |
| 18 m_size = size; | |
| 19 m_data = new T[size]; | |
| 20 } | |
| 21 | |
| 22 public Chunk(int size, T value) { | |
| 23 m_size = size; | |
| 24 m_hi = 1; | |
| 25 m_alloc = 1; | |
| 26 m_data = new T[size]; | |
| 27 m_data[0] = value; | |
| 28 } | |
| 29 | |
| 30 public int Low { | |
| 31 get { return m_low; } | |
| 32 } | |
| 33 | |
| 34 public int Hi { | |
| 35 get { return m_hi; } | |
| 36 } | |
| 37 | |
| 38 public bool TryEnqueue(T value,out bool extend) { | |
| 39 extend = false; | |
| 40 int alloc; | |
| 41 do { | |
| 42 alloc = m_alloc; | |
| 43 if (alloc > m_size) | |
| 44 return false; | |
| 45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); | |
| 46 | |
| 47 if (alloc == m_size) { | |
| 48 extend = true; | |
| 49 return false; | |
| 50 } | |
| 51 | |
| 52 m_data[alloc] = value; | |
| 53 | |
| 54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | |
| 55 // spin wait for commit | |
| 56 } | |
| 57 return true; | |
| 58 } | |
| 59 | |
| 60 public bool TryDequeue(out T value,out bool recycle) { | |
| 61 int low; | |
| 62 do { | |
| 63 low = m_low; | |
| 64 if (low >= m_hi) { | |
| 65 value = default(T); | |
| 66 recycle = (low == m_size); | |
| 67 return false; | |
| 68 } | |
| 69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); | |
| 70 | |
| 71 recycle = (low == m_size - 1); | |
| 72 value = m_data[low]; | |
| 73 | |
| 74 return true; | |
| 75 } | |
| 76 | |
| 77 public T GetAt(int pos) { | |
| 78 return m_data[pos]; | |
| 79 } | |
| 80 } | |
| 81 | |
| 82 public const int DEFAULT_CHUNK_SIZE = 32; | |
| 83 | |
| 84 readonly int m_chunkSize = DEFAULT_CHUNK_SIZE; | |
| 85 | |
| 86 Chunk m_first; | |
| 87 Chunk m_last; | |
| 88 | |
| 89 public AsyncQueue() { | |
| 90 m_last = m_first = new Chunk(m_chunkSize); | |
| 91 } | |
| 92 | |
| 93 public void Enqueue(T value) { | |
| 94 var last = m_last; | |
| 95 // spin wait to the new chunk | |
| 96 bool extend = true; | |
| 97 while(last == null || !last.TryEnqueue(value, out extend)) { | |
| 98 // try to extend queue | |
| 99 if (extend || last == null) { | |
| 100 var chunk = new Chunk(m_chunkSize, value); | |
| 101 if (EnqueueChunk(last, chunk)) | |
| 102 break; | |
| 103 last = m_last; | |
| 104 } else { | |
| 105 while (last != m_last) { | |
| 106 Thread.MemoryBarrier(); | |
| 107 last = m_last; | |
| 108 } | |
| 109 } | |
| 110 } | |
| 111 } | |
| 112 | |
| 113 public bool TryDequeue(out T value) { | |
| 114 var chunk = m_first; | |
| 115 bool recycle; | |
| 116 while (chunk != null) { | |
| 117 | |
| 118 var result = chunk.TryDequeue(out value, out recycle); | |
| 119 | |
| 120 if (recycle) // this chunk is waste | |
| 121 RecycleFirstChunk(chunk); | |
| 122 else | |
| 123 return result; // this chunk is usable and returned actual result | |
| 124 | |
| 125 if (result) // this chunk is waste but the true result is always actual | |
| 126 return true; | |
| 127 | |
| 128 // try again | |
| 129 chunk = m_first; | |
| 130 } | |
| 131 | |
| 132 // the queue is empty | |
| 133 value = default(T); | |
| 134 return false; | |
| 135 } | |
| 136 | |
| 137 bool EnqueueChunk(Chunk last, Chunk chunk) { | |
| 138 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last) | |
| 139 return false; | |
| 140 | |
| 141 if (last != null) | |
| 142 last.next = chunk; | |
| 143 else | |
| 144 m_first = chunk; | |
| 145 return true; | |
| 146 } | |
| 147 | |
| 148 void RecycleFirstChunk(Chunk first) { | |
| 149 var next = first.next; | |
| 150 | |
| 151 if (next == null) { | |
| 152 // looks like this is the last chunk | |
| 153 if (first != Interlocked.CompareExchange(ref m_last, null, first)) { | |
| 154 // race | |
| 155 // maybe someone already recycled this chunk | |
| 156 // or a new chunk has been appedned to the queue | |
| 157 | |
| 158 return; // give up | |
| 159 } | |
| 160 // the tail is updated | |
| 161 } | |
| 162 | |
| 163 // we need to update the head | |
| 164 Interlocked.CompareExchange(ref m_first, next, first); | |
| 165 // if the head is already updated then give up | |
| 166 return; | |
| 167 | |
| 168 } | |
| 169 | |
| 170 #region IEnumerable implementation | |
| 171 | |
| 172 class Enumerator : IEnumerator<T> { | |
| 173 Chunk m_current; | |
| 174 int m_pos = -1; | |
| 175 | |
| 176 public Enumerator(Chunk fisrt) { | |
| 177 m_current = fisrt; | |
| 178 } | |
| 179 | |
| 180 #region IEnumerator implementation | |
| 181 | |
| 182 public bool MoveNext() { | |
| 183 if (m_current == null) | |
| 184 return false; | |
| 185 | |
| 186 if (m_pos == -1) | |
| 187 m_pos = m_current.Low; | |
| 188 else | |
| 189 m_pos++; | |
| 190 if (m_pos == m_current.Hi) { | |
| 191 m_pos = 0; | |
| 192 m_current = m_current.next; | |
| 193 } | |
| 194 | |
| 195 return true; | |
| 196 } | |
| 197 | |
| 198 public void Reset() { | |
| 199 throw new NotSupportedException(); | |
| 200 } | |
| 201 | |
| 202 object IEnumerator.Current { | |
| 203 get { | |
| 204 return Current; | |
| 205 } | |
| 206 } | |
| 207 | |
| 208 #endregion | |
| 209 | |
| 210 #region IDisposable implementation | |
| 211 | |
| 212 public void Dispose() { | |
| 213 } | |
| 214 | |
| 215 #endregion | |
| 216 | |
| 217 #region IEnumerator implementation | |
| 218 | |
| 219 public T Current { | |
| 220 get { | |
| 221 if (m_pos == -1 || m_current == null) | |
| 222 throw new InvalidOperationException(); | |
| 223 return m_current.GetAt(m_pos); | |
| 224 } | |
| 225 } | |
| 226 | |
| 227 #endregion | |
| 228 } | |
| 229 | |
| 230 public IEnumerator<T> GetEnumerator() { | |
| 231 return new Enumerator(m_first); | |
| 232 } | |
| 233 | |
| 234 #endregion | |
| 235 | |
| 236 #region IEnumerable implementation | |
| 237 | |
| 238 IEnumerator IEnumerable.GetEnumerator() { | |
| 239 return GetEnumerator(); | |
| 240 } | |
| 241 | |
| 242 #endregion | |
| 243 } | |
| 244 } | 
