Mercurial > pub > ImplabNet
annotate Implab.Test/AsyncTests.cs @ 159:5558e43c79bb v2
minor fix
author | cin |
---|---|
date | Thu, 18 Feb 2016 16:06:14 +0300 |
parents | ec91a6dfa5b3 |
children | 8200ab154c8a |
rev | line source |
---|---|
77 | 1 using System; |
2 using System.Reflection; | |
3 using System.Threading; | |
4 using Implab.Parallels; | |
5 | |
6 #if MONO | |
7 | |
8 using NUnit.Framework; | |
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; | |
145 | 10 using TestMethodAttribute = NUnit.Framework.TestAttribute; |
77 | 11 |
12 #else | |
13 | |
14 using Microsoft.VisualStudio.TestTools.UnitTesting; | |
15 | |
16 #endif | |
17 | |
18 namespace Implab.Test { | |
19 [TestClass] | |
20 public class AsyncTests { | |
21 [TestMethod] | |
22 public void ResolveTest() { | |
23 int res = -1; | |
24 var p = new Promise<int>(); | |
25 p.Then(x => res = x); | |
26 p.Resolve(100); | |
27 | |
28 Assert.AreEqual(100, res); | |
29 } | |
30 | |
31 [TestMethod] | |
32 public void RejectTest() { | |
33 int res = -1; | |
34 Exception err = null; | |
35 | |
36 var p = new Promise<int>(); | |
37 p.Then( | |
38 x => res = x, | |
39 e => { | |
40 err = e; | |
41 return -2; | |
42 } | |
43 ); | |
44 p.Reject(new ApplicationException("error")); | |
45 | |
46 Assert.AreEqual(res, -1); | |
47 Assert.AreEqual(err.Message, "error"); | |
48 | |
49 } | |
50 | |
51 [TestMethod] | |
52 public void CancelExceptionTest() { | |
53 var p = new Promise<bool>(); | |
145 | 54 p.CancelOperation(null); |
77 | 55 |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
56 var p2 = p.Then(x => x, null, reason => { |
77 | 57 throw new ApplicationException("CANCELLED"); |
58 }); | |
59 | |
60 try { | |
61 p2.Join(); | |
62 Assert.Fail(); | |
63 } catch (ApplicationException err) { | |
64 Assert.AreEqual("CANCELLED", err.InnerException.Message); | |
65 } | |
66 | |
67 } | |
68 | |
69 [TestMethod] | |
70 public void ContinueOnCancelTest() { | |
71 var p = new Promise<bool>(); | |
145 | 72 p.CancelOperation(null); |
77 | 73 |
74 var p2 = p | |
145 | 75 .Then(x => x, null, reason => { |
77 | 76 throw new ApplicationException("CANCELLED"); |
77 }) | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
78 .Then(x => x, e => true); |
77 | 79 |
80 Assert.AreEqual(true, p2.Join()); | |
81 } | |
82 | |
83 [TestMethod] | |
84 public void JoinSuccessTest() { | |
85 var p = new Promise<int>(); | |
86 p.Resolve(100); | |
87 Assert.AreEqual(p.Join(), 100); | |
88 } | |
89 | |
90 [TestMethod] | |
91 public void JoinFailTest() { | |
92 var p = new Promise<int>(); | |
93 p.Reject(new ApplicationException("failed")); | |
94 | |
95 try { | |
96 p.Join(); | |
97 throw new ApplicationException("WRONG!"); | |
98 } catch (TargetInvocationException err) { | |
99 Assert.AreEqual(err.InnerException.Message, "failed"); | |
100 } catch { | |
101 Assert.Fail("Got wrong excaption"); | |
102 } | |
103 } | |
104 | |
105 [TestMethod] | |
106 public void MapTest() { | |
107 var p = new Promise<int>(); | |
108 | |
109 var p2 = p.Then(x => x.ToString()); | |
110 p.Resolve(100); | |
111 | |
112 Assert.AreEqual(p2.Join(), "100"); | |
113 } | |
114 | |
115 [TestMethod] | |
116 public void FixErrorTest() { | |
117 var p = new Promise<int>(); | |
118 | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
119 var p2 = p.Then(x => x, e => 101); |
77 | 120 |
121 p.Reject(new Exception()); | |
122 | |
123 Assert.AreEqual(p2.Join(), 101); | |
124 } | |
125 | |
126 [TestMethod] | |
127 public void ChainTest() { | |
128 var p1 = new Promise<int>(); | |
129 | |
130 var p3 = p1.Chain(x => { | |
131 var p2 = new Promise<string>(); | |
132 p2.Resolve(x.ToString()); | |
133 return p2; | |
134 }); | |
135 | |
136 p1.Resolve(100); | |
137 | |
138 Assert.AreEqual(p3.Join(), "100"); | |
139 } | |
140 | |
141 [TestMethod] | |
105 | 142 public void ChainFailTest() { |
143 var p1 = new Promise<int>(); | |
144 | |
145 var p3 = p1.Chain(x => { | |
146 var p2 = new Promise<string>(); | |
147 p2.Reject(new Exception("DIE!!!")); | |
148 return p2; | |
149 }); | |
150 | |
151 p1.Resolve(100); | |
152 | |
153 Assert.IsTrue(p3.IsResolved); | |
154 } | |
155 | |
156 [TestMethod] | |
77 | 157 public void PoolTest() { |
158 var pid = Thread.CurrentThread.ManagedThreadId; | |
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); | |
160 | |
161 Assert.AreNotEqual(pid, p.Join()); | |
162 } | |
163 | |
164 [TestMethod] | |
165 public void WorkerPoolSizeTest() { | |
81 | 166 var pool = new WorkerPool(5, 10, 1); |
77 | 167 |
168 Assert.AreEqual(5, pool.PoolSize); | |
169 | |
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
173 | |
174 Assert.AreEqual(5, pool.PoolSize); | |
175 | |
176 for (int i = 0; i < 100; i++) | |
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
178 Thread.Sleep(200); | |
179 Assert.AreEqual(10, pool.PoolSize); | |
180 | |
181 pool.Dispose(); | |
182 } | |
183 | |
184 [TestMethod] | |
185 public void WorkerPoolCorrectTest() { | |
186 var pool = new WorkerPool(0,1000,100); | |
187 | |
188 const int iterations = 1000; | |
189 int pending = iterations; | |
190 var stop = new ManualResetEvent(false); | |
191 | |
192 var count = 0; | |
193 for (int i = 0; i < iterations; i++) { | |
194 pool | |
195 .Invoke(() => 1) | |
196 .Then(x => Interlocked.Add(ref count, x)) | |
197 .Then(x => Math.Log10(x)) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
198 .On(() => { |
77 | 199 Interlocked.Decrement(ref pending); |
200 if (pending == 0) | |
201 stop.Set(); | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
202 }, PromiseEventType.All); |
77 | 203 } |
204 | |
205 stop.WaitOne(); | |
206 | |
207 Assert.AreEqual(iterations, count); | |
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); | |
209 pool.Dispose(); | |
210 | |
211 } | |
212 | |
213 [TestMethod] | |
214 public void WorkerPoolDisposeTest() { | |
215 var pool = new WorkerPool(5, 20); | |
216 Assert.AreEqual(5, pool.PoolSize); | |
217 pool.Dispose(); | |
218 Thread.Sleep(500); | |
219 Assert.AreEqual(0, pool.PoolSize); | |
220 pool.Dispose(); | |
221 } | |
222 | |
223 [TestMethod] | |
224 public void MTQueueTest() { | |
225 var queue = new MTQueue<int>(); | |
226 int res; | |
227 | |
228 queue.Enqueue(10); | |
229 Assert.IsTrue(queue.TryDequeue(out res)); | |
230 Assert.AreEqual(10, res); | |
231 Assert.IsFalse(queue.TryDequeue(out res)); | |
232 | |
233 for (int i = 0; i < 1000; i++) | |
234 queue.Enqueue(i); | |
235 | |
236 for (int i = 0; i < 1000; i++) { | |
237 queue.TryDequeue(out res); | |
238 Assert.AreEqual(i, res); | |
239 } | |
240 | |
241 int writers = 0; | |
242 int readers = 0; | |
243 var stop = new ManualResetEvent(false); | |
244 int total = 0; | |
245 | |
246 const int itemsPerWriter = 10000; | |
247 const int writersCount = 10; | |
248 | |
249 for (int i = 0; i < writersCount; i++) { | |
250 Interlocked.Increment(ref writers); | |
251 AsyncPool | |
124 | 252 .RunThread(() => { |
77 | 253 for (int ii = 0; ii < itemsPerWriter; ii++) { |
254 queue.Enqueue(1); | |
255 } | |
256 return 1; | |
257 }) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); |
77 | 259 } |
260 | |
261 for (int i = 0; i < 10; i++) { | |
262 Interlocked.Increment(ref readers); | |
263 AsyncPool | |
124 | 264 .RunThread(() => { |
77 | 265 int t; |
266 do { | |
267 while (queue.TryDequeue(out t)) | |
268 Interlocked.Add(ref total, t); | |
269 } while (writers > 0); | |
270 return 1; | |
271 }) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
272 .On(() => { |
77 | 273 Interlocked.Decrement(ref readers); |
274 if (readers == 0) | |
275 stop.Set(); | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
276 }, PromiseEventType.All); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
277 } |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
278 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
279 stop.WaitOne(); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
280 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
281 Assert.AreEqual(100000, total); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
282 } |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
283 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
284 [TestMethod] |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
285 public void AsyncQueueTest() { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
286 var queue = new AsyncQueue<int>(); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
287 int res; |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
288 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
289 queue.Enqueue(10); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
290 Assert.IsTrue(queue.TryDequeue(out res)); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
291 Assert.AreEqual(10, res); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
292 Assert.IsFalse(queue.TryDequeue(out res)); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
293 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
294 for (int i = 0; i < 1000; i++) |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
295 queue.Enqueue(i); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
296 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
297 for (int i = 0; i < 1000; i++) { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
298 queue.TryDequeue(out res); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
299 Assert.AreEqual(i, res); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
300 } |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
301 |
121 | 302 const int count = 10000000; |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
303 |
121 | 304 int res1 = 0, res2 = 0; |
305 var t1 = Environment.TickCount; | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
306 |
121 | 307 AsyncPool.RunThread( |
308 () => { | |
309 for (var i = 0; i < count; i++) | |
310 queue.Enqueue(1); | |
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
312 }, | |
313 () => { | |
314 for (var i = 0; i < count; i++) | |
315 queue.Enqueue(2); | |
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
317 }, | |
318 () => { | |
319 int temp; | |
320 int i = 0; | |
321 while (i < count) | |
322 if (queue.TryDequeue(out temp)) { | |
323 i++; | |
324 res1 += temp; | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
325 } |
121 | 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); |
327 }, | |
328 () => { | |
329 int temp; | |
330 int i = 0; | |
331 while (i < count) | |
332 if (queue.TryDequeue(out temp)) { | |
333 i++; | |
334 res2 += temp; | |
335 } | |
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
337 } | |
338 ) | |
124 | 339 .Bundle() |
121 | 340 .Join(); |
341 | |
342 Assert.AreEqual(count * 3, res1 + res2); | |
343 | |
344 Console.WriteLine( | |
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
346 Environment.TickCount - t1, | |
347 res1, | |
348 res2, | |
349 res1 + res2, | |
350 count | |
351 ); | |
352 } | |
353 | |
354 [TestMethod] | |
355 public void AsyncQueueBatchTest() { | |
356 var queue = new AsyncQueue<int>(); | |
357 | |
358 const int wBatch = 29; | |
359 const int wCount = 400000; | |
360 const int total = wBatch * wCount * 2; | |
361 const int summ = wBatch * wCount * 3; | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
362 |
121 | 363 int r1 = 0, r2 = 0; |
364 const int rBatch = 111; | |
365 int read = 0; | |
366 | |
367 var t1 = Environment.TickCount; | |
368 | |
369 AsyncPool.RunThread( | |
370 () => { | |
371 var buffer = new int[wBatch]; | |
372 for(int i = 0; i<wBatch; i++) | |
373 buffer[i] = 1; | |
374 | |
375 for(int i =0; i < wCount; i++) | |
376 queue.EnqueueRange(buffer,0,wBatch); | |
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
378 }, | |
379 () => { | |
380 var buffer = new int[wBatch]; | |
381 for(int i = 0; i<wBatch; i++) | |
382 buffer[i] = 2; | |
383 | |
384 for(int i =0; i < wCount; i++) | |
385 queue.EnqueueRange(buffer,0,wBatch); | |
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
387 }, | |
388 () => { | |
389 var buffer = new int[rBatch]; | |
77 | 390 |
121 | 391 while(read < total) { |
392 int actual; | |
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
394 for(int i=0; i< actual; i++) | |
395 r1 += buffer[i]; | |
396 Interlocked.Add(ref read, actual); | |
397 } | |
398 } | |
399 | |
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |
401 }, | |
402 () => { | |
403 var buffer = new int[rBatch]; | |
77 | 404 |
121 | 405 while(read < total) { |
406 int actual; | |
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
408 for(int i=0; i< actual; i++) | |
409 r2 += buffer[i]; | |
410 Interlocked.Add(ref read, actual); | |
411 } | |
412 } | |
413 | |
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
415 } | |
416 ) | |
124 | 417 .Bundle() |
121 | 418 .Join(); |
419 | |
420 Assert.AreEqual(summ , r1 + r2); | |
421 | |
422 Console.WriteLine( | |
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
424 Environment.TickCount - t1, | |
425 r1, | |
426 r2, | |
427 r1 + r2, | |
428 total | |
429 ); | |
77 | 430 } |
431 | |
432 [TestMethod] | |
122
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
433 public void AsyncQueueChunkDequeueTest() { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
434 var queue = new AsyncQueue<int>(); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
435 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
436 const int wBatch = 31; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
437 const int wCount = 200000; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
438 const int total = wBatch * wCount * 3; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
439 const int summ = wBatch * wCount * 6; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
440 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
441 int r1 = 0, r2 = 0; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
442 const int rBatch = 1024; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
443 int read = 0; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
444 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
445 var t1 = Environment.TickCount; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
446 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
447 AsyncPool.RunThread( |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
448 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
449 var buffer = new int[wBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
450 for(int i = 0; i<wBatch; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
451 buffer[i] = 1; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
452 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
453 for(int i =0; i < wCount; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
454 queue.EnqueueRange(buffer,0,wBatch); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
456 }, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
457 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
458 var buffer = new int[wBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
459 for(int i = 0; i<wBatch; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
460 buffer[i] = 2; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
461 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
462 for(int i =0; i < wCount; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
463 queue.EnqueueRange(buffer,0,wBatch); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
465 }, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
466 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
467 var buffer = new int[wBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
468 for(int i = 0; i<wBatch; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
469 buffer[i] = 3; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
470 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
471 for(int i =0; i < wCount; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
472 queue.EnqueueRange(buffer,0,wBatch); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
474 }, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
475 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
476 var buffer = new int[rBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
477 int count = 1; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
478 double avgchunk = 0; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
479 while(read < total) { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
480 int actual; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
482 for(int i=0; i< actual; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
483 r2 += buffer[i]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
484 Interlocked.Add(ref read, actual); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
486 count ++; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
487 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
488 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
489 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
491 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
492 ) |
124 | 493 .Bundle() |
494 .Join(); | |
495 | |
496 Assert.AreEqual(summ , r1 + r2); | |
497 | |
498 Console.WriteLine( | |
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
500 Environment.TickCount - t1, | |
501 r1, | |
502 r2, | |
503 r1 + r2, | |
504 total | |
505 ); | |
506 } | |
507 | |
508 [TestMethod] | |
509 public void AsyncQueueDrainTest() { | |
510 var queue = new AsyncQueue<int>(); | |
511 | |
512 const int wBatch = 11; | |
513 const int wCount = 200000; | |
514 const int total = wBatch * wCount * 3; | |
515 const int summ = wBatch * wCount * 3; | |
516 | |
517 int r1 = 0, r2 = 0; | |
518 const int rBatch = 11; | |
519 int read = 0; | |
520 | |
521 var t1 = Environment.TickCount; | |
522 | |
523 AsyncPool.RunThread( | |
524 () => { | |
525 var buffer = new int[wBatch]; | |
526 for(int i = 0; i<wBatch; i++) | |
527 buffer[i] = 1; | |
528 | |
529 for(int i =0; i < wCount; i++) | |
530 queue.EnqueueRange(buffer,0,wBatch); | |
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
532 }, | |
533 () => { | |
534 for(int i =0; i < wCount * wBatch; i++) | |
535 queue.Enqueue(1); | |
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
537 }, | |
538 () => { | |
539 var buffer = new int[wBatch]; | |
540 for(int i = 0; i<wBatch; i++) | |
541 buffer[i] = 1; | |
542 | |
543 for(int i =0; i < wCount; i++) | |
544 queue.EnqueueRange(buffer,0,wBatch); | |
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |
546 }, | |
547 /*() => { | |
548 int temp; | |
549 int count = 0; | |
550 while (read < total) | |
551 if (queue.TryDequeue(out temp)) { | |
552 count++; | |
553 r1 += temp; | |
554 Interlocked.Increment(ref read); | |
555 } | |
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); | |
557 },*/ | |
558 /*() => { | |
559 var buffer = new int[rBatch]; | |
560 var count = 0; | |
561 while(read < total) { | |
562 int actual; | |
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
564 for(int i=0; i< actual; i++) | |
565 r1 += buffer[i]; | |
566 Interlocked.Add(ref read, actual); | |
567 count += actual; | |
568 } | |
569 } | |
570 | |
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
572 },*/ | |
573 () => { | |
574 var count = 0; | |
575 while(read < total) { | |
576 var buffer = queue.Drain(); | |
577 for(int i=0; i< buffer.Length; i++) | |
578 r1 += buffer[i]; | |
579 Interlocked.Add(ref read, buffer.Length); | |
580 count += buffer.Length; | |
581 } | |
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
583 }, | |
584 () => { | |
585 var count = 0; | |
586 while(read < total) { | |
587 var buffer = queue.Drain(); | |
588 for(int i=0; i< buffer.Length; i++) | |
589 r2 += buffer[i]; | |
590 Interlocked.Add(ref read, buffer.Length); | |
591 count += buffer.Length; | |
592 } | |
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); | |
594 } | |
595 ) | |
596 .Bundle() | |
122
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
597 .Join(); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
598 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
599 Assert.AreEqual(summ , r1 + r2); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
600 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
601 Console.WriteLine( |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
603 Environment.TickCount - t1, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
604 r1, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
605 r2, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
606 r1 + r2, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
607 total |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
608 ); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
609 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
610 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
611 [TestMethod] |
77 | 612 public void ParallelMapTest() { |
613 | |
614 const int count = 100000; | |
615 | |
616 var args = new double[count]; | |
617 var rand = new Random(); | |
618 | |
619 for (int i = 0; i < count; i++) | |
620 args[i] = rand.NextDouble(); | |
621 | |
622 var t = Environment.TickCount; | |
623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); | |
624 | |
625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
626 | |
627 t = Environment.TickCount; | |
628 for (int i = 0; i < count; i++) | |
629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
631 } | |
632 | |
633 [TestMethod] | |
634 public void ChainedMapTest() { | |
635 | |
125 | 636 using (var pool = new WorkerPool()) { |
77 | 637 const int count = 10000; |
638 | |
639 var args = new double[count]; | |
640 var rand = new Random(); | |
641 | |
642 for (int i = 0; i < count; i++) | |
643 args[i] = rand.NextDouble(); | |
644 | |
645 var t = Environment.TickCount; | |
646 var res = args | |
647 .ChainedMap( | |
648 // Analysis disable once AccessToDisposedClosure | |
649 x => pool.Invoke( | |
650 () => Math.Sin(x * x) | |
651 ), | |
652 4 | |
653 ) | |
654 .Join(); | |
655 | |
656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
657 | |
658 t = Environment.TickCount; | |
659 for (int i = 0; i < count; i++) | |
660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | |
663 } | |
664 } | |
665 | |
666 [TestMethod] | |
667 public void ParallelForEachTest() { | |
668 | |
669 const int count = 100000; | |
670 | |
671 var args = new int[count]; | |
672 var rand = new Random(); | |
673 | |
674 for (int i = 0; i < count; i++) | |
675 args[i] = (int)(rand.NextDouble() * 100); | |
676 | |
677 int result = 0; | |
678 | |
679 var t = Environment.TickCount; | |
680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); | |
681 | |
682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); | |
683 | |
684 int result2 = 0; | |
685 | |
686 t = Environment.TickCount; | |
687 for (int i = 0; i < count; i++) | |
688 result2 += args[i]; | |
689 Assert.AreEqual(result2, result); | |
690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
691 } | |
692 | |
693 [TestMethod] | |
694 public void ComplexCase1Test() { | |
695 var flags = new bool[3]; | |
696 | |
697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) | |
698 | |
699 var step1 = PromiseHelper | |
700 .Sleep(200, "Alan") | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
701 .On(() => flags[0] = true, PromiseEventType.Cancelled); |
77 | 702 var p = step1 |
703 .Chain(x => | |
704 PromiseHelper | |
705 .Sleep(200, "Hi, " + x) | |
706 .Then(y => y) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
707 .On(() => flags[1] = true, PromiseEventType.Cancelled) |
77 | 708 ) |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
709 .On(() => flags[2] = true, PromiseEventType.Cancelled); |
77 | 710 step1.Join(); |
711 p.Cancel(); | |
712 try { | |
713 Assert.AreEqual(p.Join(), "Hi, Alan"); | |
714 Assert.Fail("Shouldn't get here"); | |
715 } catch (OperationCanceledException) { | |
716 } | |
717 | |
718 Assert.IsFalse(flags[0]); | |
719 Assert.IsTrue(flags[1]); | |
720 Assert.IsTrue(flags[2]); | |
721 } | |
722 | |
723 [TestMethod] | |
724 public void ChainedCancel1Test() { | |
725 // при отмене сцепленной асинхронной операции все обещание должно | |
726 // завершаться ошибкой OperationCanceledException | |
727 var p = PromiseHelper | |
728 .Sleep(1, "Hi, HAL!") | |
729 .Then(x => { | |
730 // запускаем две асинхронные операции | |
731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); | |
732 // вторая операция отменяет первую до завершения | |
733 PromiseHelper | |
734 .Sleep(100, "HAL, STOP!") | |
735 .Then(result.Cancel); | |
736 return result; | |
737 }); | |
738 try { | |
739 p.Join(); | |
740 } catch (TargetInvocationException err) { | |
741 Assert.IsTrue(err.InnerException is OperationCanceledException); | |
742 } | |
743 } | |
744 | |
745 [TestMethod] | |
746 public void ChainedCancel2Test() { | |
747 // при отмене цепочки обещаний, вложенные операции также должны отменяться | |
748 var pSurvive = new Promise<bool>(); | |
149 | 749 var hemStarted = new Signal(); |
77 | 750 var p = PromiseHelper |
751 .Sleep(1, "Hi, HAL!") | |
149 | 752 .Chain(() => { |
77 | 753 hemStarted.Set(); |
754 // запускаем две асинхронные операции | |
755 var result = PromiseHelper | |
149 | 756 .Sleep(2000, "HEM ENABLED!!!") |
757 .Then(() => pSurvive.Resolve(false)); | |
77 | 758 |
759 result | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
760 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled); |
149 | 761 |
77 | 762 return result; |
763 }); | |
764 | |
149 | 765 hemStarted.Wait(); |
77 | 766 p.Cancel(); |
767 | |
768 try { | |
769 p.Join(); | |
149 | 770 Assert.Fail(); |
77 | 771 } catch (OperationCanceledException) { |
772 } | |
149 | 773 Assert.IsTrue(pSurvive.Join()); |
77 | 774 } |
136 | 775 |
776 [TestMethod] | |
777 public void SharedLockTest() { | |
778 var l = new SharedLock(); | |
779 int shared = 0; | |
780 int exclusive = 0; | |
781 var s1 = new Signal(); | |
782 var log = new AsyncQueue<string>(); | |
783 | |
784 try { | |
785 AsyncPool.RunThread( | |
786 () => { | |
787 log.Enqueue("Reader #1 started"); | |
788 try { | |
789 l.LockShared(); | |
790 log.Enqueue("Reader #1 lock got"); | |
791 if (Interlocked.Increment(ref shared) == 2) | |
792 s1.Set(); | |
793 s1.Wait(); | |
794 log.Enqueue("Reader #1 finished"); | |
795 Interlocked.Decrement(ref shared); | |
796 } finally { | |
797 l.Release(); | |
798 log.Enqueue("Reader #1 lock released"); | |
799 } | |
800 }, | |
801 () => { | |
802 log.Enqueue("Reader #2 started"); | |
803 | |
804 try { | |
805 l.LockShared(); | |
806 log.Enqueue("Reader #2 lock got"); | |
807 | |
808 if (Interlocked.Increment(ref shared) == 2) | |
809 s1.Set(); | |
810 s1.Wait(); | |
811 log.Enqueue("Reader #2 upgrading to writer"); | |
812 Interlocked.Decrement(ref shared); | |
813 l.Upgrade(); | |
814 log.Enqueue("Reader #2 upgraded"); | |
815 | |
816 Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |
817 Assert.AreEqual(0, shared); | |
818 log.Enqueue("Reader #2 finished"); | |
819 Interlocked.Decrement(ref exclusive); | |
820 } finally { | |
821 l.Release(); | |
822 log.Enqueue("Reader #2 lock released"); | |
823 } | |
824 }, | |
825 () => { | |
826 log.Enqueue("Writer #1 started"); | |
827 try { | |
828 l.LockExclusive(); | |
829 log.Enqueue("Writer #1 got the lock"); | |
830 Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |
831 Interlocked.Decrement(ref exclusive); | |
832 log.Enqueue("Writer #1 is finished"); | |
833 } finally { | |
834 l.Release(); | |
835 log.Enqueue("Writer #1 lock released"); | |
836 } | |
837 } | |
838 ).Bundle().Join(1000); | |
839 log.Enqueue("Done"); | |
840 } catch(Exception error) { | |
841 log.Enqueue(error.Message); | |
842 throw; | |
843 } finally { | |
844 foreach (var m in log) | |
845 Console.WriteLine(m); | |
846 } | |
847 } | |
151 | 848 |
849 #if NET_4_5 | |
850 | |
851 [TestMethod] | |
852 public async void TaskInteropTest() { | |
853 var promise = new Promise<int>(); | |
854 promise.Resolve(10); | |
855 var res = await promise; | |
856 | |
857 Assert.AreEqual(10, res); | |
858 } | |
859 | |
860 #endif | |
77 | 861 } |
862 } | |
863 |