annotate Implab/Parallels/AsyncQueue.cs @ 128:6241bff0cd64 v2

Added Signal class a lightweight alternative to ManualResetEvent
author cin
date Thu, 29 Jan 2015 05:09:31 +0300
parents d86da8d2d4c3
children 471f596b2603
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
1 using System.Threading;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
2 using System.Collections.Generic;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
3 using System;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
4 using System.Collections;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
5 using System.Diagnostics;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
6
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
7 namespace Implab.Parallels {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
8 public class AsyncQueue<T> : IEnumerable<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
9 class Chunk {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
10 public Chunk next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
11
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
12 int m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
13 int m_hi;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
14 int m_alloc;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
15 readonly int m_size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
16 readonly T[] m_data;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
17
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
18 public Chunk(int size) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
19 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
20 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
21 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
22
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
23 public Chunk(int size, T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
24 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
25 m_hi = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
26 m_alloc = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
27 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
28 m_data[0] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
29 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
30
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
32 m_size = size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
33 m_hi = length;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
34 m_alloc = alloc;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
35 m_data = new T[size];
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
36 Array.Copy(data, offset, m_data, 0, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
37 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
38
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
39 public int Low {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
40 get { return m_low; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
41 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
42
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
43 public int Hi {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
44 get { return m_hi; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
45 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
46
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
47 public int Size {
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
48 get { return m_size; }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
49 }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
50
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
51 public bool TryEnqueue(T value, out bool extend) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
53
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
54 if (alloc >= m_size) {
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
55 extend = alloc == m_size;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
56 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
57 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
58
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
59 extend = false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
60 m_data[alloc] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
61
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
63 // spin wait for commit
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
64 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
65 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
66 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
67
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
68 /// <summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
70 /// </summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
71 public void Commit() {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
73
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
74 while (m_hi != actual)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
75 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
76 }
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
77
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
78 public bool TryDequeue(out T value, out bool recycle) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
79 int low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
80 do {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
81 low = m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
82 if (low >= m_hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
83 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
84 recycle = (low == m_size);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
85 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
86 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
88
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
89 recycle = (low == m_size - 1);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
90 value = m_data[low];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
91
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
92 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
93 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
94
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
96 //int alloc;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
97 //int allocSize;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
98
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
100 if (alloc > m_size) {
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
101 // the chunk is full and someone already
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
102 // creating the new one
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
103 enqueued = 0; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
104 extend = false; // the caller shouldn't try to extend the queue
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
105 return false; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
106 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
107
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
108 enqueued = Math.Min(m_size - alloc, length);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
109 extend = length > enqueued;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
110
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
111 if (enqueued == 0)
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
112 return false;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
113
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
114
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
116
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
118 // spin wait for commit
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
119 }
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
120
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
121 return true;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
122 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
123
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
125 int low, hi, batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
126
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
127 do {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
128 low = m_low;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
129 hi = m_hi;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
130 if (low >= hi) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
131 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
133 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
134 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
135 batchSize = Math.Min(hi - low, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
137
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
138 recycle = (low == m_size - batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
139 dequeued = batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
140
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
141 Array.Copy(m_data, low, buffer, offset, batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
142
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
143 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
144 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
145
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
146 public T GetAt(int pos) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
147 return m_data[pos];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
148 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
149 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
150
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
151 public const int DEFAULT_CHUNK_SIZE = 32;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
152 public const int MAX_CHUNK_SIZE = 262144;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
153
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
154 Chunk m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
155 Chunk m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
156
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
157 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
158 /// Adds the specified value to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
159 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
160 /// <param name="value">Tha value which will be added to the queue.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
161 public void Enqueue(T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
162 var last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
163 // spin wait to the new chunk
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
164 bool extend = true;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
165 while (last == null || !last.TryEnqueue(value, out extend)) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
166 // try to extend queue
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
167 if (extend || last == null) {
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
169 if (EnqueueChunk(last, chunk))
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
170 break; // success! exit!
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
171 last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
172 } else {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
173 while (last == m_last) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
174 Thread.MemoryBarrier();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
175 }
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
176 last = m_last;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
177 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
178 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
179 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
180
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
181 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
182 /// Adds the specified data to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
183 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
185 /// <param name="offset">The offset of the data in the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
186 /// <param name="length">The size of the data to read from the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
187 public void EnqueueRange(T[] data, int offset, int length) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
188 if (data == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
189 throw new ArgumentNullException("data");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
190 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
191 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
192 if (length < 1 || offset + length > data.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
193 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
194
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
195 var last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
196
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
197 bool extend;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
198 int enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
199
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
200 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
201 extend = true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
202 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
203 length -= enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
204 offset += enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
205 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
206
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
207 if (extend) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
208 // there was no enough space in the chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
209 // or there was no chunks in the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
210
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
211 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
212
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
213 var size = Math.Min(length, MAX_CHUNK_SIZE);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
214
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
215 var chunk = new Chunk(
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
216 Math.Max(size, DEFAULT_CHUNK_SIZE),
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
217 data,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
218 offset,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
219 size,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
220 length // length >= size
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
221 );
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
222
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
223 if (!EnqueueChunk(last, chunk)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
224 // looks like the queue has been updated then proceed from the beginning
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
225 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
226 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
227 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
228
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
229 // we have successfully added the new chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
230 last = chunk;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
231 length -= size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
232 offset += size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
233 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
234 } else {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
235 // we don't need to extend the queue, if we successfully enqueued data
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
236 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
237 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
238
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
239 // if we need to wait while someone is extending the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
240 // spinwait
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
241 while (last == m_last) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
242 Thread.MemoryBarrier();
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
243 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
244
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
245 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
246 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
247 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
248 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
249
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
250 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
251 /// Tries to retrieve the first element from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
252 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
253 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
254 /// <param name="value">The value of the dequeued element.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
255 public bool TryDequeue(out T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
256 var chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
257 bool recycle;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
258 while (chunk != null) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
259
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
260 var result = chunk.TryDequeue(out value, out recycle);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
261
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
262 if (recycle) // this chunk is waste
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
263 RecycleFirstChunk(chunk);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
264 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
265 return result; // this chunk is usable and returned actual result
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
266
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
267 if (result) // this chunk is waste but the true result is always actual
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
268 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
269
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
270 // try again
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
271 chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
272 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
273
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
274 // the queue is empty
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
275 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
276 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
277 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
278
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
279 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
280 /// Tries to dequeue the specified amount of data from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
281 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
282 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
283 /// <param name="buffer">The buffer to which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
284 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
285 /// <param name="length">The maximum amount of data to be retrieved.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
286 /// <param name="dequeued">The actual amout of the retrieved data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
287 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
288 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
289 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
290 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
291 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
292 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
293 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
294
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
295 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
296 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
297 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
298 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
299
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
300 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
301 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
302 offset += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
303 length -= actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
304 dequeued += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
305 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
306
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
307 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
308 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
309 else if (actual == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
310 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
311
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
312 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
313 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
314
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
315 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
316 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
317 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
318 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
319
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
320 return dequeued != 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
321 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
322
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
323 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
324 /// Tries to dequeue all remaining data in the first chunk.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
325 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
326 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
327 /// <param name="buffer">The buffer to which the data will be written.</param>
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
328 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
329 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
330 /// <param name="dequeued">The actual amount of the dequeued data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
331 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
332 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
333 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
334 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
335 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
336 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
337 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
338
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
339 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
340 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
341 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
342
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
343 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
344
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
345 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
346 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
347 dequeued = actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
348 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
349
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
350 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
351 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
352
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
353 // if we have dequeued any data, then return
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
354 if (dequeued != 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
355 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
356
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
357 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
358 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
359 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
360 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
361
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
362 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
363 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
364
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
365 bool EnqueueChunk(Chunk last, Chunk chunk) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
366 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
367 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
368
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
369 if (last != null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
370 last.next = chunk;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
371 else {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
372 m_first = chunk;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
373 }
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
374 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
375 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
376
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
377 void RecycleFirstChunk(Chunk first) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
378 var next = first.next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
379
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
380 if (first != Interlocked.CompareExchange(ref m_first, next, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
381 return;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
382
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
383 if (next == null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
384
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
385 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
386 /*while (first.next == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
387 Thread.MemoryBarrier();*/
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
388
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
389 // race
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
390 // someone already updated the tail, restore the pointer to the queue head
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
391 m_first = first;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
392 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
393 // the tail is updated
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
394 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
395
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
396 // we need to update the head
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
397 //Interlocked.CompareExchange(ref m_first, next, first);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
398 // if the head is already updated then give up
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
399 //return;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
400
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
401 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
402
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
403 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
404 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
405 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
406
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
407 do {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
408 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
409 var first = m_first;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
410 var last = m_last;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
411
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
412 if (last == null) // nothing to clear
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
413 return;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
414
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
415 if (first == null || (first.next == null && first != last)) // inconcistency
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
416 continue;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
417
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
418 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
419 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
420 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
421 continue;// inconsistent
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
422
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
423 m_last = chunk;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
424
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
425 return;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
426
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
427 } while(true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
428 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
429
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
430 public T[] Drain() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
431 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
432 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
433
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
434 do {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
435 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
436 var first = m_first;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
437 var last = m_last;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
438
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
439 if (last == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
440 return new T[0];
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
441
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
442 if (first == null || (first.next == null && first != last))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
443 continue;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
444
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
445 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
446 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
447 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
448 continue;// inconsistent
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
449
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
450 last = Interlocked.Exchange(ref m_last, chunk);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
451
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
452 return ReadChunks(first, last);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
453
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
454 } while(true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
455 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
456
128
6241bff0cd64 Added Signal class a lightweight alternative to ManualResetEvent
cin
parents: 127
diff changeset
457 static T[] ReadChunks(Chunk chunk, object last) {
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
458 var result = new List<T>();
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
459 var buffer = new T[DEFAULT_CHUNK_SIZE];
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
460 int actual;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
461 bool recycle;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
462 while (chunk != null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
463 // ensure all write operations on the chunk are complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
464 chunk.Commit();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
465
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
466 // we need to read the chunk using this way
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
467 // since some client still may completing the dequeue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
468 // operation, such clients most likely won't get results
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
469 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
470 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
471
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
472 if (chunk == last) {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
473 chunk = null;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
474 } else {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
475 while (chunk.next == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
476 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
477 chunk = chunk.next;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
478 }
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
479 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
480
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
481 return result.ToArray();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
482 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
483
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
484 struct ArraySegmentCollection : ICollection<T> {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
485 readonly T[] m_data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
486 readonly int m_offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
487 readonly int m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
488
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
489 public ArraySegmentCollection(T[] data, int offset, int length) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
490 m_data = data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
491 m_offset = offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
492 m_length = length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
493 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
494
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
495 #region ICollection implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
496
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
497 public void Add(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
498 throw new InvalidOperationException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
499 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
500
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
501 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
502 throw new InvalidOperationException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
503 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
504
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
505 public bool Contains(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
506 return false;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
507 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
508
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
509 public void CopyTo(T[] array, int arrayIndex) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
510 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
511 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
512
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
513 public bool Remove(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
514 throw new NotImplementedException();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
515 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
516
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
517 public int Count {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
518 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
519 return m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
520 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
521 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
522
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
523 public bool IsReadOnly {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
524 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
525 return true;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
526 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
527 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
528
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
529 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
530
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
531 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
532
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
533 public IEnumerator<T> GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
534 for (int i = m_offset; i < m_length + m_offset; i++)
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
535 yield return m_data[i];
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
536 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
537
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
538 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
539
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
540 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
541
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
542 IEnumerator IEnumerable.GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
543 return GetEnumerator();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
544 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
545
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
546 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
547 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
548
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
549 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
550
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
551 class Enumerator : IEnumerator<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
552 Chunk m_current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
553 int m_pos = -1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
554
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
555 public Enumerator(Chunk fisrt) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
556 m_current = fisrt;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
557 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
558
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
559 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
560
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
561 public bool MoveNext() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
562 if (m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
563 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
564
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
565 if (m_pos == -1)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
566 m_pos = m_current.Low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
567 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
568 m_pos++;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
569
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
570 if (m_pos == m_current.Hi) {
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
571
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
572 m_current = m_pos == m_current.Size ? m_current.next : null;
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
573
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
574 m_pos = 0;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
575
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
576 if (m_current == null)
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
577 return false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
578 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
579
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
580 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
581 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
582
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
583 public void Reset() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
584 throw new NotSupportedException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
585 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
586
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
587 object IEnumerator.Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
588 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
589 return Current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
590 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
591 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
592
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
593 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
594
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
595 #region IDisposable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
596
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
597 public void Dispose() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
598 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
599
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
600 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
601
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
602 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
603
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
604 public T Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
605 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
606 if (m_pos == -1 || m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
607 throw new InvalidOperationException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
608 return m_current.GetAt(m_pos);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
609 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
610 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
611
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
612 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
613 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
614
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
615 public IEnumerator<T> GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
616 return new Enumerator(m_first);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
617 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
618
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
619 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
620
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
621 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
622
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
623 IEnumerator IEnumerable.GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
624 return GetEnumerator();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
625 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
626
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
627 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
628 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
629 }