annotate Implab/Parallels/AsyncQueue.cs @ 233:d6fe09f5592c v2

Improved AsyncQueue Removed ImplabFx
author cin
date Wed, 04 Oct 2017 15:44:47 +0300
parents 8d5de4eb9c2c
children 8dd666e6b6bf
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
1 using System.Threading;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
2 using System.Collections.Generic;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
3 using System;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
4 using System.Collections;
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
5 using System.Diagnostics;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
6
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
7 namespace Implab.Parallels {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
8 public class AsyncQueue<T> : IEnumerable<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
9 class Chunk {
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
10 public volatile Chunk next;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
11
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
12 volatile int m_low;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
13 volatile int m_hi;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
14 volatile int m_alloc;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
15 readonly int m_size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
16 readonly T[] m_data;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
17
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
18 public Chunk(int size) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
19 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
20 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
21 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
22
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
23 public Chunk(int size, T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
24 m_size = size;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
25 m_hi = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
26 m_alloc = 1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
27 m_data = new T[size];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
28 m_data[0] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
29 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
30
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
31 public Chunk(int size, int allocated) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
32 m_size = size;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
33 m_hi = allocated;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
34 m_alloc = allocated;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
35 m_data = new T[size];
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
36 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
37
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
38 public void WriteData(T[] data, int offset, int dest, int length) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
39 Array.Copy(data, offset, m_data, dest, length);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
40 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
41
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
42 public int Low {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
43 get { return m_low; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
44 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
45
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
46 public int Hi {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
47 get { return m_hi; }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
48 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
49
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
50 public int Size {
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
51 get { return m_size; }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
52 }
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
53
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
54 public bool TryEnqueue(T value) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
55 int alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
56 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
57 alloc = m_alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
58 if (alloc >= m_size)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
59 return false;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
60 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + 1, alloc));
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
61
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
62 m_data[alloc] = value;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
63
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
64 SpinWait spin = new SpinWait();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
65 // m_hi is volatile
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
66 while (alloc != m_hi) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
67 // spin wait for commit
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
68 spin.SpinOnce();
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
69 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
70 m_hi = alloc + 1;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
71
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
72 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
73 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
74
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
75 /// <summary>
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
76 /// Prevents from allocating new space in the chunk and waits for all write operations to complete
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
77 /// </summary>
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
78 public void Seal() {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
79 var actual = Math.Min(Interlocked.Exchange(ref m_alloc, m_size), m_size);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
80 SpinWait spin = new SpinWait();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
81 while (m_hi != actual) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
82 spin.SpinOnce();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
83 }
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
84 }
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
85
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
86 public bool TryDequeue(out T value, out bool recycle) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
87 int low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
88 do {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
89 low = m_low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
90 if (low >= m_hi) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
91 value = default(T);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
92 recycle = (low == m_size);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
93 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
94 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
95 } while (low != Interlocked.CompareExchange(ref m_low, low + 1, low));
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
96
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
97 recycle = (low + 1 == m_size);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
98 value = m_data[low];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
99
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
100 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
101 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
102
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
103 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
104 int alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
105 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
106 alloc = m_alloc;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
107 if (alloc >= m_size) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
108 enqueued = 0;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
109 return false;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
110 } else {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
111 enqueued = Math.Min(length, m_size - alloc);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
112 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
113 } while (alloc != Interlocked.CompareExchange(ref m_alloc, alloc + enqueued, alloc));
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
114
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
115 Array.Copy(batch, offset, m_data, alloc, enqueued);
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
116
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
117 SpinWait spin = new SpinWait();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
118 while (alloc != m_hi) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
119 spin.SpinOnce();
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
120 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
121
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
122 m_hi = alloc + enqueued;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
123 return true;
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
124 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
125
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
126 public bool TryDequeueBatch(T[] buffer, int offset, int length, out int dequeued, out bool recycle) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
127 int low, hi, batchSize;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
128
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
129 do {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
130 low = m_low;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
131 hi = m_hi;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
132 if (low >= hi) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
133 dequeued = 0;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
134 recycle = (low == m_size);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
135 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
136 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
137 batchSize = Math.Min(hi - low, length);
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
138 } while (low != Interlocked.CompareExchange(ref m_low, low + batchSize, low));
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
139
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
140 dequeued = batchSize;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
141 recycle = (low + batchSize == m_size);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
142 Array.Copy(m_data, low, buffer, offset, batchSize);
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
143
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
144 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
145 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
146
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
147 public T GetAt(int pos) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
148 return m_data[pos];
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
149 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
150 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
151
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
152 public const int DEFAULT_CHUNK_SIZE = 32;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
153 public const int MAX_CHUNK_SIZE = 256;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
154
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
155 Chunk m_first;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
156 Chunk m_last;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
157
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
158 public AsyncQueue() {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
159 m_first = m_last = new Chunk(DEFAULT_CHUNK_SIZE);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
160 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
161
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
162 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
163 /// Adds the specified value to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
164 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
165 /// <param name="value">Tha value which will be added to the queue.</param>
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
166 public void Enqueue(T value) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
167 var last = m_last;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
168 SpinWait spin = new SpinWait();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
169 while (!last.TryEnqueue(value)) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
170 // try to extend queue
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
171 var chunk = new Chunk(DEFAULT_CHUNK_SIZE, value);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
172 var t = Interlocked.CompareExchange(ref m_last, chunk, last);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
173 if (t == last) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
174 last.next = chunk;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
175 break;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
176 } else {
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
177 last = t;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
178 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
179 spin.SpinOnce();
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
180 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
181 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
182
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
183 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
184 /// Adds the specified data to the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
185 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
186 /// <param name="data">The buffer which contains the data to be enqueued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
187 /// <param name="offset">The offset of the data in the buffer.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
188 /// <param name="length">The size of the data to read from the buffer.</param>
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
189 public void EnqueueRange(T[] data, int offset, int length) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
190 if (data == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
191 throw new ArgumentNullException("data");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
192 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
193 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
194 if (length < 1 || offset + length > data.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
195 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
196
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
197 while (length > 0) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
198 var last = m_last;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
199 int enqueued;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
200
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
201 if (last.TryEnqueueBatch(data, offset, length, out enqueued)) {
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
202 length -= enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
203 offset += enqueued;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
204 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
205
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
206 if (length > 0) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
207 // we have something to enqueue
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
208
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
209 var tail = length % MAX_CHUNK_SIZE;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
210
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
211 var chunk = new Chunk(Math.Max(tail, DEFAULT_CHUNK_SIZE), tail);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
212
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
213 if (last != Interlocked.CompareExchange(ref m_last, chunk, last))
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
214 continue; // we wasn't able to catch the writer, roundtrip
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
215
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
216 // we are lucky
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
217 // we can exclusively write our batch, the other writers will continue their work
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
218
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
219 length -= tail;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
220
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
221
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
222 for(var i = 0; i < length; i+= MAX_CHUNK_SIZE) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
223 var node = new Chunk(MAX_CHUNK_SIZE, MAX_CHUNK_SIZE);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
224 node.WriteData(data, offset, 0, MAX_CHUNK_SIZE);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
225 offset += MAX_CHUNK_SIZE;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
226 // fence last.next is volatile
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
227 last.next = node;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
228 last = node;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
229 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
230
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
231 if (tail > 0)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
232 chunk.WriteData(data, offset, 0, tail);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
234 // fence last.next is volatile
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
235 last.next = chunk;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
236 return;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
237 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
238 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
239 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
240
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
241 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
242 /// Tries to retrieve the first element from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
243 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
244 /// <returns><c>true</c>, if element is dequeued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
245 /// <param name="value">The value of the dequeued element.</param>
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
246 public bool TryDequeue(out T value) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
247 var chunk = m_first;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
248 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
249 bool recycle;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
250
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
251 var result = chunk.TryDequeue(out value, out recycle);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
252
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
253 if (recycle && chunk.next != null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
254 // this chunk is waste
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
255 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
256 } else {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
257 return result; // this chunk is usable and returned actual result
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
258 }
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
259
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
260 if (result) // this chunk is waste but the true result is always actual
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
261 return true;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
262 } while (true);
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
263 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
264
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
265 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
266 /// Tries to dequeue the specified amount of data from the queue.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
267 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
268 /// <returns><c>true</c>, if data was deuqueued, <c>false</c> otherwise.</returns>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
269 /// <param name="buffer">The buffer to which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
270 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
271 /// <param name="length">The maximum amount of data to be retrieved.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
272 /// <param name="dequeued">The actual amout of the retrieved data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
273 public bool TryDequeueRange(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
274 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
275 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
276 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
277 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
278 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
279 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
280
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
281 var chunk = m_first;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
282 dequeued = 0;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
283 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
284 bool recycle;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
285 int actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
286 if (chunk.TryDequeueBatch(buffer, offset, length, out actual, out recycle)) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
287 offset += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
288 length -= actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
289 dequeued += actual;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
290 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
291
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
292 if (recycle && chunk.next != null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
293 // this chunk is waste
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
294 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
295 } else {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
296 chunk = null;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
297 }
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
298
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
299 if (length == 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
300 return true;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
301 } while (chunk != null);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
302
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
303 return dequeued != 0;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
304 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
305
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
306 /// <summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
307 /// Tries to dequeue all remaining data in the first chunk.
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
308 /// </summary>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
309 /// <returns><c>true</c>, if data was dequeued, <c>false</c> otherwise.</returns>
122
0c8685c8b56b minor fixes and improvements of AsyncQueue, additional tests
cin
parents: 121
diff changeset
310 /// <param name="buffer">The buffer to which the data will be written.</param>
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
311 /// <param name="offset">The offset in the buffer at which the data will be written.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
312 /// <param name="length">Tha maximum amount of the data to be dequeued.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
313 /// <param name="dequeued">The actual amount of the dequeued data.</param>
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
314 public bool TryDequeueChunk(T[] buffer, int offset, int length, out int dequeued) {
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
315 if (buffer == null)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
316 throw new ArgumentNullException("buffer");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
317 if (offset < 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
318 throw new ArgumentOutOfRangeException("offset");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
319 if (length < 1 || offset + length > buffer.Length)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
320 throw new ArgumentOutOfRangeException("length");
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
321
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
322 var chunk = m_first;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
323 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
324 bool recycle;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
325 chunk.TryDequeueBatch(buffer, offset, length, out dequeued, out recycle);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
326
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
327 if (recycle && chunk.next != null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
328 // this chunk is waste
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
329 chunk = Interlocked.CompareExchange(ref m_first, chunk.next, chunk);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
330 } else {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
331 chunk = null;
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
332 }
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
333
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
334 // if we have dequeued any data, then return
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
335 if (dequeued != 0)
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
336 return true;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
337
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
338 } while (chunk != null);
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
339
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
340 return false;
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
341 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
342
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
343
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
344 public void Clear() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
345 // start the new queue
125
f803565868a4 improved performance of promises
cin
parents: 124
diff changeset
346 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
347 do {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
348 var first = m_first;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
349 if (first.next == null && first != m_last) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
350 continue;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
351 }
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
352
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
353 // here we will create inconsistency which will force others to spin
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
354 // and prevent from fetching. chunk.next = null
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
355 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
356 continue;// inconsistent
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
357
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
358 m_last = chunk;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
359 return;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
360 } while (true);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
361 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
362
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
363 public List<T> Drain() {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
364 // start the new queue
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
365 var chunk = new Chunk(DEFAULT_CHUNK_SIZE);
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
366
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
367 do {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
368 var first = m_first;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
369 // first.next is volatile
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
370 if (first.next == null) {
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
371 if (first != m_last)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
372 continue;
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
373 else if (first.Hi == first.Low)
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
374 return new List<T>();
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
375 }
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
376
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
377 // here we will create inconsistency which will force others to spin
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
378 // and prevent from fetching. chunk.next = null
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
379 if (first != Interlocked.CompareExchange(ref m_first, chunk, first))
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
380 continue;// inconsistent
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
381
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
382 var last = Interlocked.Exchange(ref m_last, chunk);
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
383
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
384 return ReadChunks(first, last);
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
385
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
386 } while (true);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
387 }
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
388
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
389 static List<T> ReadChunks(Chunk chunk, object last) {
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
390 var result = new List<T>();
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
391 var buffer = new T[MAX_CHUNK_SIZE];
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
392 int actual;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
393 bool recycle;
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
394 SpinWait spin = new SpinWait();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
395 while (chunk != null) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
396 // ensure all write operations on the chunk are complete
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
397 chunk.Seal();
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
398
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
399 // we need to read the chunk using this way
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
400 // since some client still may completing the dequeue
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
401 // operation, such clients most likely won't get results
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
402 while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
403 result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
404
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
405 if (chunk == last) {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
406 chunk = null;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
407 } else {
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
408 while (chunk.next == null)
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
409 spin.SpinOnce();
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
410 chunk = chunk.next;
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 123
diff changeset
411 }
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
412 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
413
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
414 return result;
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
415 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
416
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
417 struct ArraySegmentCollection : ICollection<T> {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
418 readonly T[] m_data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
419 readonly int m_offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
420 readonly int m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
421
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
422 public ArraySegmentCollection(T[] data, int offset, int length) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
423 m_data = data;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
424 m_offset = offset;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
425 m_length = length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
426 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
427
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
428 #region ICollection implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
429
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
430 public void Add(T item) {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
431 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
432 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
433
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
434 public void Clear() {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
435 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
436 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
437
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
438 public bool Contains(T item) {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
439 return false;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
440 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
441
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
442 public void CopyTo(T[] array, int arrayIndex) {
233
d6fe09f5592c Improved AsyncQueue
cin
parents: 227
diff changeset
443 Array.Copy(m_data, m_offset, array, arrayIndex, m_length);
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
444 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
445
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
446 public bool Remove(T item) {
129
471f596b2603 Added SharedLock to synchronization routines
cin
parents: 128
diff changeset
447 throw new NotSupportedException();
123
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
448 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
449
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
450 public int Count {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
451 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
452 return m_length;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
453 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
454 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
455
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
456 public bool IsReadOnly {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
457 get {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
458 return true;
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
459 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
460 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
461
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
462 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
463
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
464 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
465
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
466 public IEnumerator<T> GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
467 for (int i = m_offset; i < m_length + m_offset; i++)
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
468 yield return m_data[i];
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
469 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
470
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
471 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
472
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
473 #region IEnumerable implementation
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
474
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
475 IEnumerator IEnumerable.GetEnumerator() {
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
476 return GetEnumerator();
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
477 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
478
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
479 #endregion
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
480 }
f4d6ea6969cc async queue improvements
cin
parents: 122
diff changeset
481
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
482 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
483
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
484 class Enumerator : IEnumerator<T> {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
485 Chunk m_current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
486 int m_pos = -1;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
487
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
488 public Enumerator(Chunk fisrt) {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
489 m_current = fisrt;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
490 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
491
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
492 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
493
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
494 public bool MoveNext() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
495 if (m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
496 return false;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
497
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
498 if (m_pos == -1)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
499 m_pos = m_current.Low;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
500 else
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
501 m_pos++;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
502
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
503 if (m_pos == m_current.Hi) {
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
504
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
505 m_current = m_pos == m_current.Size ? m_current.next : null;
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
506
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
507 m_pos = 0;
127
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
508
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
509 if (m_current == null)
d86da8d2d4c3 fixed AsyncQueue iterator
cin
parents: 125
diff changeset
510 return false;
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
511 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
512
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
513 return true;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
514 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
515
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
516 public void Reset() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
517 throw new NotSupportedException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
518 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
519
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
520 object IEnumerator.Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
521 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
522 return Current;
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
523 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
524 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
525
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
526 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
527
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
528 #region IDisposable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
529
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
530 public void Dispose() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
531 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
532
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
533 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
534
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
535 #region IEnumerator implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
536
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
537 public T Current {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
538 get {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
539 if (m_pos == -1 || m_current == null)
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
540 throw new InvalidOperationException();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
541 return m_current.GetAt(m_pos);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
542 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
543 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
544
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
545 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
546 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
547
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
548 public IEnumerator<T> GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
549 return new Enumerator(m_first);
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
550 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
551
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
552 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
553
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
554 #region IEnumerable implementation
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
555
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
556 IEnumerator IEnumerable.GetEnumerator() {
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
557 return GetEnumerator();
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
558 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
559
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
560 #endregion
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
561 }
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents:
diff changeset
562 }