annotate Implab/Parallels/AsyncQueue.cs @ 209:a867536c68fc v2

Bound promise to CancellationToken Added new states to ExecutionSate enum. Added Safe.Guard() method to handle cleanup of the result of the promise
author cin
date Wed, 16 Nov 2016 03:06:08 +0300
parents 238e15580926
children 8d5de4eb9c2c
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
1 using System.Threading;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
2 using System.Collections.Generic;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
3 using System;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
4 using System.Collections;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
5 using System.Diagnostics;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
6
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
7 namespace Implab.Parallels {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
8 public class AsyncQueue<T> : IEnumerable<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
9 class Chunk {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
10 public Chunk next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
11
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
12 int m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
13 int m_hi;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
14 int m_alloc;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
15 readonly int m_size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
16 readonly T[] m_data;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
17
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
18 public Chunk(int size) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
19 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
20 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
21 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
22
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
23 public Chunk(int size, T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
24 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
25 m_hi = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
26 m_alloc = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
27 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
28 m_data[0] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
29 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
30
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
31 public Chunk(int size, T[] data, int offset, int length, int alloc) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
32 m_size = size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
33 m_hi = length;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
34 m_alloc = alloc;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
35 m_data = new T[size];
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
36 Array.Copy(data, offset, m_data, 0, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
37 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
38
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
39 public int Low {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
40 get { return m_low; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
41 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
42
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
43 public int Hi {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
44 get { return m_hi; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
45 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
46
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
47 public int Size {
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
48 get { return m_size; }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
49 }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
50
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
51 public bool TryEnqueue(T value, out bool extend) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
52 var alloc = Interlocked.Increment(ref m_alloc) - 1;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
53
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
54 if (alloc >= m_size) {
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
55 extend = alloc == m_size;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
56 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
57 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
58
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
59 extend = false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
60 m_data[alloc] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
61
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
62 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
63 // spin wait for commit
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
64 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
65 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
66 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
67
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
68 /// <summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
69 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
70 /// </summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
71 public void Commit() {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
72 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size + 1), m_size);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
73
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
74 while (m_hi != actual)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
75 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
76 }
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
77
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
78 public bool TryDequeue(out T value, out bool recycle) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
79 int low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
80 do {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
81 low = m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
82 if (low >= m_hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
83 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
84 recycle = (low == m_size);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
85 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
86 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
87 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
88
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
89 recycle = (low == m_size - 1);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
90 value = m_data[low];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
91
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
92 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
93 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
94
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
95 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
96 //int alloc;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
97 //int allocSize;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
98
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
99 var alloc = Interlocked.Add(ref m_alloc, length) - length;
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
100 if (alloc > m_size) {
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
101 // the chunk is full and someone already
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
102 // creating the new one
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
103 enqueued = 0; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
104 extend = false; // the caller shouldn't try to extend the queue
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
105 return false; // nothing was added
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
106 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
107
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
108 enqueued = Math.Min(m_size - alloc, length);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
109 extend = length > enqueued;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
110
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
111 if (enqueued == 0)
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
112 return false;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
113
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
114
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
116
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
117 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + enqueued, alloc)) {
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
118 // spin wait for commit
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
119 }
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
120
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
121 return true;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
122 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
123
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
124 public bool TryDequeueBatch(T[] buffer, int offset, int length,out int dequeued, out bool recycle) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
125 int low, hi, batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
126
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
127 do {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
128 low = m_low;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
129 hi = m_hi;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
130 if (low >= hi) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
131 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
132 recycle = (low == m_size); // recycling could be restarted and we need to signal again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
133 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
134 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
135 batchSize = Math.Min(hi - low, length);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
136 } while(low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
137
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
138 recycle = (low == m_size - batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
139 dequeued = batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
140
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
141 Array.Copy(m_data, low, buffer, offset, batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
142
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
143 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
144 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
145
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
146 public T GetAt(int pos) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
147 return m_data[pos];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
148 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
149 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
150
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
151 public const int DEFAULT_CHUNK_SIZE = 32;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
152 public const int MAX_CHUNK_SIZE = 262144;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
153
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
154 Chunk m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
155 Chunk m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
156
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
157 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
158 /// Adds the specified value to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
159 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
160 /// <param name="value">Tha value which will be added to the queue.</param>
137
238e15580926 added the blocking queue
cin
parents: 130
diff changeset
161 public virtual void Enqueue(T value) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
162 var last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
163 // spin wait to the new chunk
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
164 bool extend = true;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
165 while (last == null || !last.TryEnqueue(value, out extend)) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
166 // try to extend queue
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
167 if (extend || last == null) {
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
168 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
169 if (EnqueueChunk(last, chunk))
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
170 break; // success! exit!
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
171 last = m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
172 } else {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
173 while (last == m_last) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
174 Thread.MemoryBarrier();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
175 }
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
176 last = m_last;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
177 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
178 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
179 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
180
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
181 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
182 /// Adds the specified data to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
183 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
184 /// <param name="data">The buffer which contains the data to be enqueued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
185 /// <param name="offset">The offset of the data in the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
186 /// <param name="length">The size of the data to read from the buffer.</param>
137
238e15580926 added the blocking queue
cin
parents: 130
diff changeset
187 public virtual void EnqueueRange(T[] data, int offset, int length) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
188 if (data == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
189 throw new ArgumentNullException("data");
130
671f60cd0250 fixed Resove method bug when calling it on already cancelled promise
cin
parents: 129
diff changeset
190 if (length == 0)
671f60cd0250 fixed Resove method bug when calling it on already cancelled promise
cin
parents: 129
diff changeset
191 return;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
192 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
193 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
194 if (length < 1 || offset + length > data.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
195 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
196
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
197 var last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
198
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
199 bool extend;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
200 int enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
201
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
202 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
203 extend = true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
204 if (last != null && last.TryEnqueueBatch(data, offset, length, out enqueued, out extend)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
205 length -= enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
206 offset += enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
207 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
208
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
209 if (extend) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
210 // there was no enough space in the chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
211 // or there was no chunks in the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
212
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
213 while (length > 0) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
214
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
215 var size = Math.Min(length, MAX_CHUNK_SIZE);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
216
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
217 var chunk = new Chunk(
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
218 Math.Max(size, DEFAULT_CHUNK_SIZE),
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
219 data,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
220 offset,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
221 size,
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
222 length // length >= size
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
223 );
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
224
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
225 if (!EnqueueChunk(last, chunk)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
226 // looks like the queue has been updated then proceed from the beginning
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
227 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
228 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
229 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
230
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
231 // we have successfully added the new chunk
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
232 last = chunk;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
233 length -= size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
234 offset += size;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
235 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
236 } else {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
237 // we don't need to extend the queue, if we successfully enqueued data
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
238 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
239 break;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
240
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
241 // if we need to wait while someone is extending the queue
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
242 // spinwait
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
243 while (last == m_last) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
244 Thread.MemoryBarrier();
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
245 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
246
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
247 last = m_last;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
248 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
249 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
250 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
251
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
252 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
253 /// Tries to retrieve the first element from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
254 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
255 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
256 /// <param name="value">The value of the dequeued element.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
257 public bool TryDequeue(out T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
258 var chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
259 bool recycle;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
260 while (chunk != null) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
261
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
262 var result = chunk.TryDequeue(out value, out recycle);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
263
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
264 if (recycle) // this chunk is waste
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
265 RecycleFirstChunk(chunk);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
266 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
267 return result; // this chunk is usable and returned actual result
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
268
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
269 if (result) // this chunk is waste but the true result is always actual
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
270 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
271
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
272 // try again
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
273 chunk = m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
274 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
275
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
276 // the queue is empty
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
277 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
278 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
279 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
280
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
281 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
282 /// Tries to dequeue the specified amount of data from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
283 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
284 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
285 /// <param name="buffer">The buffer to which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
286 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
287 /// <param name="length">The maximum amount of data to be retrieved.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
288 /// <param name="dequeued">The actual amout of the retrieved data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
289 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
290 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
291 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
292 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
293 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
294 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
295 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
296
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
297 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
298 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
299 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
300 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
301
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
302 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
303 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
304 offset += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
305 length -= actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
306 dequeued += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
307 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
308
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
309 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
310 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
311 else if (actual == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
312 break; // the chunk is usable, but doesn't contain any data (it's the last chunk in the queue)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
313
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
314 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
315 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
316
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
317 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
318 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
319 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
320 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
321
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
322 return dequeued != 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
323 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
324
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
325 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
326 /// Tries to dequeue all remaining data in the first chunk.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
327 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
328 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
329 /// <param name="buffer">The buffer to which the data will be written.</param>
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
330 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
331 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
332 /// <param name="dequeued">The actual amount of the dequeued data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
333 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
334 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
335 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
336 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
337 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
338 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
339 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
340
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
341 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
342 bool recycle;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
343 dequeued = 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
344
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
345 while (chunk != null) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
346
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
347 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
348 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
349 dequeued = actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
350 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
351
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
352 if (recycle) // this chunk is waste
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
353 RecycleFirstChunk(chunk);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
354
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
355 // if we have dequeued any data, then return
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
356 if (dequeued != 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
357 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
358
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
359 // we still may dequeue something
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
360 // try again
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
361 chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
362 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
363
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
364 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
365 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
366
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
367 bool EnqueueChunk(Chunk last, Chunk chunk) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
368 if (Interlocked.CompareExchange(ref m_last, chunk, last) != last)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
369 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
370
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
371 if (last != null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
372 last.next = chunk;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
373 else {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
374 m_first = chunk;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
375 }
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
376 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
377 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
378
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
379 void RecycleFirstChunk(Chunk first) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
380 var next = first.next;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
381
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
382 if (first != Interlocked.CompareExchange(ref m_first, next, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
383 return;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
384
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
385 if (next == null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
386
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
387 if (first != Interlocked.CompareExchange(ref m_last, null, first)) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
388 /*while (first.next == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
389 Thread.MemoryBarrier();*/
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
390
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
391 // race
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
392 // someone already updated the tail, restore the pointer to the queue head
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
393 m_first = first;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
394 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
395 // the tail is updated
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
396 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
397
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
398 // we need to update the head
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
399 //Interlocked.CompareExchange(ref m_first, next, first);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
400 // if the head is already updated then give up
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
401 //return;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
402
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
403 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
404
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
405 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
406 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
407 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
408
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
409 do {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
410 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
411 var first = m_first;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
412 var last = m_last;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
413
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
414 if (last == null) // nothing to clear
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
415 return;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
416
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
417 if (first == null || (first.next == null && first != last)) // inconcistency
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
418 continue;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
419
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
420 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
421 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
422 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
423 continue;// inconsistent
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
424
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
425 m_last = chunk;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
426
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
427 return;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
428
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
429 } while(true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
430 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
431
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
432 public T[] Drain() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
433 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
434 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
435
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
436 do {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
437 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
438 var first = m_first;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
439 var last = m_last;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
440
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
441 if (last == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
442 return new T[0];
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
443
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
444 if (first == null || (first.next == null && first != last))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
445 continue;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
446
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
447 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
448 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
449 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
450 continue;// inconsistent
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
451
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
452 last = Interlocked.Exchange(ref m_last, chunk);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
453
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
454 return ReadChunks(first, last);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
455
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
456 } while(true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
457 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
458
128
6241bff0cd64 Added Signal class a lightweight alternative to ManualResetEvent
cin
parents: 127
diff changeset
459 static T[] ReadChunks(Chunk chunk, object last) {
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
460 var result = new List<T>();
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
461 var buffer = new T[DEFAULT_CHUNK_SIZE];
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
462 int actual;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
463 bool recycle;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
464 while (chunk != null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
465 // ensure all write operations on the chunk are complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
466 chunk.Commit();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
467
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
468 // we need to read the chunk using this way
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
469 // since some client still may completing the dequeue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
470 // operation, such clients most likely won't get results
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
471 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
472 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
473
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
474 if (chunk == last) {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
475 chunk = null;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
476 } else {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
477 while (chunk.next == null)
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
478 Thread.MemoryBarrier();
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
479 chunk = chunk.next;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
480 }
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
481 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
482
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
483 return result.ToArray();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
484 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
485
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
486 struct ArraySegmentCollection : ICollection<T> {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
487 readonly T[] m_data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
488 readonly int m_offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
489 readonly int m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
490
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
491 public ArraySegmentCollection(T[] data, int offset, int length) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
492 m_data = data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
493 m_offset = offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
494 m_length = length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
495 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
496
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
497 #region ICollection implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
498
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
499 public void Add(T item) {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
500 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
501 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
502
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
503 public void Clear() {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
504 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
505 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
506
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
507 public bool Contains(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
508 return false;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
509 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
510
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
511 public void CopyTo(T[] array, int arrayIndex) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
512 Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
513 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
514
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
515 public bool Remove(T item) {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
516 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
517 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
518
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
519 public int Count {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
520 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
521 return m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
522 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
523 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
524
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
525 public bool IsReadOnly {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
526 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
527 return true;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
528 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
529 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
530
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
531 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
532
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
533 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
534
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
535 public IEnumerator<T> GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
536 for (int i = m_offset; i < m_length + m_offset; i++)
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
537 yield return m_data[i];
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
538 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
539
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
540 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
541
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
542 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
543
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
544 IEnumerator IEnumerable.GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
545 return GetEnumerator();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
546 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
547
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
548 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
549 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
550
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
551 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
552
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
553 class Enumerator : IEnumerator<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
554 Chunk m_current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
555 int m_pos = -1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
556
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
557 public Enumerator(Chunk fisrt) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
558 m_current = fisrt;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
559 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
560
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
561 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
562
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
563 public bool MoveNext() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
564 if (m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
565 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
566
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
567 if (m_pos == -1)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
568 m_pos = m_current.Low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
569 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
570 m_pos++;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
571
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
572 if (m_pos == m_current.Hi) {
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
573
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
574 m_current = m_pos == m_current.Size ? m_current.next : null;
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
575
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
576 m_pos = 0;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
577
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
578 if (m_current == null)
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
579 return false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
580 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
581
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
582 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
583 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
584
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
585 public void Reset() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
586 throw new NotSupportedException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
587 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
588
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
589 object IEnumerator.Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
590 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
591 return Current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
592 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
593 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
594
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
595 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
596
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
597 #region IDisposable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
598
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
599 public void Dispose() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
600 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
601
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
602 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
603
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
604 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
605
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
606 public T Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
607 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
608 if (m_pos == -1 || m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
609 throw new InvalidOperationException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
610 return m_current.GetAt(m_pos);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
611 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
612 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
613
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
614 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
615 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
616
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
617 public IEnumerator<T> GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
618 return new Enumerator(m_first);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
619 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
620
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
621 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
622
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
623 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
624
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
625 IEnumerator IEnumerable.GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
626 return GetEnumerator();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
627 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
628
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
629 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
630 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
631 }