annotate Implab/Parallels/AsyncQueue.cs @ 126:f7b2b8bfbb8c v2

minor changes
author cin
date Mon, 26 Jan 2015 02:12:01 +0300
parents f803565868a4
children d86da8d2d4c3
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
1 using System.Threading;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
2 using System.Collections.Generic;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
3 using System;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
4 using System.Collections;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
5 using System.Diagnostics;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
6
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
7 namespace Implab.Parallels {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
8 public class AsyncQueue<T> : IEnumerable<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
9 class Chunk {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
10 public Chunk next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
11
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
12 int m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
13 int m_hi;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
14 int m_alloc;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
15 readonly int m_size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
16 readonly T[] m_data;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
17
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
18 public Chunk(int size) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
19 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
20 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
21 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
22
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
23 public Chunk(int size, T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
24 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
25 m_hi = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
26 m_alloc = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
27 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
28 m_data[0] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
29 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
30
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
32 m_size = size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
33 m_hi = length;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
34 m_alloc = alloc;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
35 m_data = new T[size];
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
36 Array.Copy(data, offset, m_data, 0, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
37 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
38
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
39 public int Low {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
40 get { return m_low; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
41 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
42
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
43 public int Hi {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
44 get { return m_hi; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
45 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
46
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
47 public bool TryEnqueue(T value, out bool extend) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
48 var alloc = Interlocked.Increment(ref m_alloc) - 1;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
49
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
50 if (alloc >= m_size) {
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
51 extend = alloc == m_size;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
52 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
53 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
54
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
55 extend = false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
56 m_data[alloc] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
57
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
58 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
59 // spin wait for commit
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
60 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
61 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
62 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
63
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
64 /// <summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
65 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
66 /// </summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
67 public void Commit() {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
68 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
69
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
70 while (m_hi != actual)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
71 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
72 }
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
73
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
74 public bool TryDequeue(out T value, out bool recycle) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
75 int low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
76 do {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
77 low = m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
78 if (low >= m_hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
79 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
80 recycle = (low == m_size);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
81 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
82 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
83 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
84
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
85 recycle = (low == m_size - 1);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
86 value = m_data[low];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
87
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
88 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
89 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
90
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
91 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
92 //int alloc;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
93 //int allocSize;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
94
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
95 var alloc = Interlocked.Add(ref m_alloc, length) - length;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
96 if (alloc > m_size) {
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
97 // the chunk is full and someone already
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
98 // creating the new one
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
99 enqueued = 0; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
100 extend = false; // the caller shouldn't try to extend the queue
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
101 return false; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
102 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
103
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
104 enqueued = Math.Min(m_size - alloc, length);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
105 extend = length > enqueued;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
106
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
107 if (enqueued == 0)
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
108 return false;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
109
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
110
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
111 Array.Copy(batch, offset, m_data, alloc, enqueued);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
112
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
113 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
114 // spin wait for commit
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
115 }
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
116
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
117 return true;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
118 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
119
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
120 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
121 int low, hi, batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
122
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
123 do {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
124 low = m_low;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
125 hi = m_hi;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
126 if (low >= hi) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
127 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
128 recycle = (low == m_size); // recycling could be restarted and we need to signal again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
129 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
130 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
131 batchSize = Math.Min(hi - low, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
132 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
133
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
134 recycle = (low == m_size - batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
135 dequeued = batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
136
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
137 Array.Copy(m_data, low, buffer, offset, batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
138
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
139 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
140 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
141
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
142 public T GetAt(int pos) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
143 return m_data[pos];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
144 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
145 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
146
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
147 public const int DEFAULT_CHUNK_SIZE = 32;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
148 public const int MAX_CHUNK_SIZE = 262144;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
149
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
150 Chunk m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
151 Chunk m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
152
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
153 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
154 /// Adds the specified value to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
155 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
156 /// <param name="value">Tha value which will be added to the queue.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
157 public void Enqueue(T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
158 var last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
159 // spin wait to the new chunk
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
160 bool extend = true;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
161 while (last == null || !last.TryEnqueue(value, out extend)) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
162 // try to extend queue
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
163 if (extend || last == null) {
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
164 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
165 if (EnqueueChunk(last, chunk))
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
166 break; // success! exit!
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
167 last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
168 } else {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
169 while (last == m_last) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
170 Thread.MemoryBarrier();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
171 }
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
172 last = m_last;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
173 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
174 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
175 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
176
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
177 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
178 /// Adds the specified data to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
179 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
180 /// <param name="data">The buffer which contains the data to be enqueued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
181 /// <param name="offset">The offset of the data in the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
182 /// <param name="length">The size of the data to read from the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
183 public void EnqueueRange(T[] data, int offset, int length) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
184 if (data == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
185 throw new ArgumentNullException("data");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
186 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
187 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
188 if (length < 1 || offset + length > data.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
189 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
190
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
191 var last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
192
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
193 bool extend;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
194 int enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
195
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
196 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
197 extend = true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
198 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
199 length -= enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
200 offset += enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
201 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
202
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
203 if (extend) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
204 // there was no enough space in the chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
205 // or there was no chunks in the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
206
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
207 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
208
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
209 var size = Math.Min(length, MAX_CHUNK_SIZE);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
210
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
211 var chunk = new Chunk(
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
212 Math.Max(size, DEFAULT_CHUNK_SIZE),
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
213 data,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
214 offset,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
215 size,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
216 length // length >= size
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
217 );
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
218
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
219 if (!EnqueueChunk(last, chunk)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
220 // looks like the queue has been updated then proceed from the beginning
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
221 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
222 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
223 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
224
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
225 // we have successfully added the new chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
226 last = chunk;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
227 length -= size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
228 offset += size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
229 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
230 } else {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
231 // we don't need to extend the queue, if we successfully enqueued data
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
232 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
233 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
234
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
235 // if we need to wait while someone is extending the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
236 // spinwait
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
237 while (last == m_last) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
238 Thread.MemoryBarrier();
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
239 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
240
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
241 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
242 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
243 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
244 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
245
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
246 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
247 /// Tries to retrieve the first element from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
248 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
249 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
250 /// <param name="value">The value of the dequeued element.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
251 public bool TryDequeue(out T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
252 var chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
253 bool recycle;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
254 while (chunk != null) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
255
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
256 var result = chunk.TryDequeue(out value, out recycle);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
257
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
258 if (recycle) // this chunk is waste
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
259 RecycleFirstChunk(chunk);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
260 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
261 return result; // this chunk is usable and returned actual result
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
262
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
263 if (result) // this chunk is waste but the true result is always actual
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
264 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
265
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
266 // try again
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
267 chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
268 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
269
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
270 // the queue is empty
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
271 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
272 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
273 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
274
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
275 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
276 /// Tries to dequeue the specified amount of data from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
277 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
278 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
279 /// <param name="buffer">The buffer to which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
280 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
281 /// <param name="length">The maximum amount of data to be retrieved.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
282 /// <param name="dequeued">The actual amout of the retrieved data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
283 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
284 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
285 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
286 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
287 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
288 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
289 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
290
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
291 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
292 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
293 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
294 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
295
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
296 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
297 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
298 offset += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
299 length -= actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
300 dequeued += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
301 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
302
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
303 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
304 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
305 else if (actual == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
306 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
307
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
308 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
309 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
310
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
311 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
312 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
313 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
314 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
315
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
316 return dequeued != 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
317 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
318
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
319 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
320 /// Tries to dequeue all remaining data in the first chunk.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
321 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
322 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
323 /// <param name="buffer">The buffer to which the data will be written.</param>
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
324 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
325 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
326 /// <param name="dequeued">The actual amount of the dequeued data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
327 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
328 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
329 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
330 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
331 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
332 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
333 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
334
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
335 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
336 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
337 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
338
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
339 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
340
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
341 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
342 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
343 dequeued = actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
344 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
345
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
346 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
347 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
348
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
349 // if we have dequeued any data, then return
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
350 if (dequeued != 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
351 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
352
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
353 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
354 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
355 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
356 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
357
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
358 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
359 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
360
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
361 bool EnqueueChunk(Chunk last, Chunk chunk) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
362 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
363 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
364
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
365 if (last != null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
366 last.next = chunk;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
367 else {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
368 m_first = chunk;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
369 }
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
370 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
371 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
372
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
373 void RecycleFirstChunk(Chunk first) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
374 var next = first.next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
375
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
376 if (first != Interlocked.CompareExchange(ref m_first, next, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
377 return;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
378
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
379 if (next == null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
380
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
381 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
382 /*while (first.next == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
383 Thread.MemoryBarrier();*/
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
384
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
385 // race
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
386 // someone already updated the tail, restore the pointer to the queue head
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
387 m_first = first;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
388 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
389 // the tail is updated
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
390 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
391
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
392 // we need to update the head
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
393 //Interlocked.CompareExchange(ref m_first, next, first);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
394 // if the head is already updated then give up
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
395 //return;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
396
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
397 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
398
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
399 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
400 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
401 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
402
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
403 do {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
404 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
405 var first = m_first;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
406 var last = m_last;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
407
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
408 if (last == null) // nothing to clear
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
409 return;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
410
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
411 if (first == null || (first.next == null && first != last)) // inconcistency
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
412 continue;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
413
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
414 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
415 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
416 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
417 continue;// inconsistent
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
418
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
419 m_last = chunk;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
420
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
421 return;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
422
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
423 } while(true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
424 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
425
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
426 public T[] Drain() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
427 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
428 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
429
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
430 do {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
431 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
432 var first = m_first;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
433 var last = m_last;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
434
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
435 if (last == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
436 return new T[0];
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
437
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
438 if (first == null || (first.next == null && first != last))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
439 continue;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
440
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
441 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
442 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
443 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
444 continue;// inconsistent
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
445
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
446 last = Interlocked.Exchange(ref m_last, chunk);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
447
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
448 return ReadChunks(first, last);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
449
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
450 } while(true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
451 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
452
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
453 T[] ReadChunks(Chunk chunk, object last) {
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
454 var result = new List<T>();
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
455 var buffer = new T[DEFAULT_CHUNK_SIZE];
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
456 int actual;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
457 bool recycle;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
458 while (chunk != null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
459 // ensure all write operations on the chunk are complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
460 chunk.Commit();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
461
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
462 // we need to read the chunk using this way
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
463 // since some client still may completing the dequeue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
464 // operation, such clients most likely won't get results
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
465 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
466 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
467
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
468 if (chunk == last) {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
469 chunk = null;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
470 } else {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
471 while (chunk.next == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
472 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
473 chunk = chunk.next;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
474 }
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
475 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
476
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
477 return result.ToArray();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
478 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
479
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
480 struct ArraySegmentCollection : ICollection<T> {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
481 readonly T[] m_data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
482 readonly int m_offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
483 readonly int m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
484
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
485 public ArraySegmentCollection(T[] data, int offset, int length) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
486 m_data = data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
487 m_offset = offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
488 m_length = length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
489 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
490
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
491 #region ICollection implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
492
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
493 public void Add(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
494 throw new InvalidOperationException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
495 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
496
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
497 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
498 throw new InvalidOperationException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
499 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
500
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
501 public bool Contains(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
502 return false;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
503 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
504
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
505 public void CopyTo(T[] array, int arrayIndex) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
506 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
507 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
508
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
509 public bool Remove(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
510 throw new NotImplementedException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
511 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
512
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
513 public int Count {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
514 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
515 return m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
516 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
517 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
518
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
519 public bool IsReadOnly {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
520 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
521 return true;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
522 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
523 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
524
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
525 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
526
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
527 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
528
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
529 public IEnumerator<T> GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
530 for (int i = m_offset; i < m_length + m_offset; i++)
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
531 yield return m_data[i];
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
532 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
533
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
534 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
535
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
536 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
537
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
538 IEnumerator IEnumerable.GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
539 return GetEnumerator();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
540 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
541
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
542 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
543 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
544
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
545 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
546
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
547 class Enumerator : IEnumerator<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
548 Chunk m_current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
549 int m_pos = -1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
550
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
551 public Enumerator(Chunk fisrt) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
552 m_current = fisrt;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
553 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
554
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
555 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
556
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
557 public bool MoveNext() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
558 if (m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
559 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
560
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
561 if (m_pos == -1)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
562 m_pos = m_current.Low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
563 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
564 m_pos++;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
565 if (m_pos == m_current.Hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
566 m_pos = 0;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
567 m_current = m_current.next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
568 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
569
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
570 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
571 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
572
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
573 public void Reset() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
574 throw new NotSupportedException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
575 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
576
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
577 object IEnumerator.Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
578 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
579 return Current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
580 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
581 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
582
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
583 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
584
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
585 #region IDisposable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
586
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
587 public void Dispose() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
588 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
589
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
590 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
591
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
592 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
593
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
594 public T Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
595 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
596 if (m_pos == -1 || m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
597 throw new InvalidOperationException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
598 return m_current.GetAt(m_pos);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
599 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
600 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
601
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
602 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
603 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
604
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
605 public IEnumerator<T> GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
606 return new Enumerator(m_first);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
607 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
608
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
609 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
610
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
611 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
612
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
613 IEnumerator IEnumerable.GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
614 return GetEnumerator();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
615 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
616
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
617 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
618 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
619 }