Mercurial > pub > ImplabNet
annotate Implab.Test/AsyncTests.cs @ 239:eedf4d834e67 v2
fix
author | cin |
---|---|
date | Wed, 13 Dec 2017 19:54:45 +0300 |
parents | d6fe09f5592c |
children |
rev | line source |
---|---|
77 | 1 using System; |
2 using System.Reflection; | |
3 using System.Threading; | |
4 using Implab.Parallels; | |
5 | |
6 #if MONO | |
7 | |
8 using NUnit.Framework; | |
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; | |
145 | 10 using TestMethodAttribute = NUnit.Framework.TestAttribute; |
77 | 11 |
12 #else | |
13 | |
14 using Microsoft.VisualStudio.TestTools.UnitTesting; | |
15 | |
16 #endif | |
17 | |
18 namespace Implab.Test { | |
19 [TestClass] | |
20 public class AsyncTests { | |
21 [TestMethod] | |
22 public void ResolveTest() { | |
23 int res = -1; | |
24 var p = new Promise<int>(); | |
25 p.Then(x => res = x); | |
26 p.Resolve(100); | |
27 | |
28 Assert.AreEqual(100, res); | |
29 } | |
30 | |
31 [TestMethod] | |
32 public void RejectTest() { | |
33 int res = -1; | |
34 Exception err = null; | |
35 | |
36 var p = new Promise<int>(); | |
37 p.Then( | |
38 x => res = x, | |
39 e => { | |
40 err = e; | |
41 return -2; | |
42 } | |
43 ); | |
44 p.Reject(new ApplicationException("error")); | |
45 | |
46 Assert.AreEqual(res, -1); | |
47 Assert.AreEqual(err.Message, "error"); | |
48 | |
49 } | |
50 | |
51 [TestMethod] | |
52 public void CancelExceptionTest() { | |
53 var p = new Promise<bool>(); | |
145 | 54 p.CancelOperation(null); |
77 | 55 |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
56 var p2 = p.Then(x => x, null, reason => { |
77 | 57 throw new ApplicationException("CANCELLED"); |
58 }); | |
59 | |
60 try { | |
61 p2.Join(); | |
62 Assert.Fail(); | |
63 } catch (ApplicationException err) { | |
64 Assert.AreEqual("CANCELLED", err.InnerException.Message); | |
65 } | |
66 | |
67 } | |
68 | |
69 [TestMethod] | |
70 public void ContinueOnCancelTest() { | |
71 var p = new Promise<bool>(); | |
145 | 72 p.CancelOperation(null); |
77 | 73 |
74 var p2 = p | |
145 | 75 .Then(x => x, null, reason => { |
77 | 76 throw new ApplicationException("CANCELLED"); |
77 }) | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
78 .Then(x => x, e => true); |
77 | 79 |
80 Assert.AreEqual(true, p2.Join()); | |
81 } | |
82 | |
83 [TestMethod] | |
84 public void JoinSuccessTest() { | |
85 var p = new Promise<int>(); | |
86 p.Resolve(100); | |
87 Assert.AreEqual(p.Join(), 100); | |
88 } | |
89 | |
90 [TestMethod] | |
91 public void JoinFailTest() { | |
92 var p = new Promise<int>(); | |
93 p.Reject(new ApplicationException("failed")); | |
94 | |
95 try { | |
96 p.Join(); | |
97 throw new ApplicationException("WRONG!"); | |
98 } catch (TargetInvocationException err) { | |
99 Assert.AreEqual(err.InnerException.Message, "failed"); | |
100 } catch { | |
101 Assert.Fail("Got wrong excaption"); | |
102 } | |
103 } | |
104 | |
105 [TestMethod] | |
106 public void MapTest() { | |
107 var p = new Promise<int>(); | |
108 | |
109 var p2 = p.Then(x => x.ToString()); | |
110 p.Resolve(100); | |
111 | |
112 Assert.AreEqual(p2.Join(), "100"); | |
113 } | |
114 | |
115 [TestMethod] | |
116 public void FixErrorTest() { | |
117 var p = new Promise<int>(); | |
118 | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
119 var p2 = p.Then(x => x, e => 101); |
77 | 120 |
121 p.Reject(new Exception()); | |
122 | |
123 Assert.AreEqual(p2.Join(), 101); | |
124 } | |
125 | |
126 [TestMethod] | |
127 public void ChainTest() { | |
128 var p1 = new Promise<int>(); | |
129 | |
130 var p3 = p1.Chain(x => { | |
131 var p2 = new Promise<string>(); | |
132 p2.Resolve(x.ToString()); | |
133 return p2; | |
134 }); | |
135 | |
136 p1.Resolve(100); | |
137 | |
138 Assert.AreEqual(p3.Join(), "100"); | |
139 } | |
140 | |
141 [TestMethod] | |
105 | 142 public void ChainFailTest() { |
143 var p1 = new Promise<int>(); | |
144 | |
145 var p3 = p1.Chain(x => { | |
146 var p2 = new Promise<string>(); | |
147 p2.Reject(new Exception("DIE!!!")); | |
148 return p2; | |
149 }); | |
150 | |
151 p1.Resolve(100); | |
152 | |
153 Assert.IsTrue(p3.IsResolved); | |
154 } | |
155 | |
156 [TestMethod] | |
77 | 157 public void PoolTest() { |
158 var pid = Thread.CurrentThread.ManagedThreadId; | |
159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); | |
160 | |
161 Assert.AreNotEqual(pid, p.Join()); | |
162 } | |
163 | |
164 [TestMethod] | |
165 public void WorkerPoolSizeTest() { | |
81 | 166 var pool = new WorkerPool(5, 10, 1); |
77 | 167 |
168 Assert.AreEqual(5, pool.PoolSize); | |
169 | |
170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
173 | |
174 Assert.AreEqual(5, pool.PoolSize); | |
175 | |
176 for (int i = 0; i < 100; i++) | |
177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
178 Thread.Sleep(200); | |
179 Assert.AreEqual(10, pool.PoolSize); | |
180 | |
181 pool.Dispose(); | |
182 } | |
183 | |
184 [TestMethod] | |
185 public void WorkerPoolCorrectTest() { | |
186 var pool = new WorkerPool(0,1000,100); | |
187 | |
188 const int iterations = 1000; | |
189 int pending = iterations; | |
190 var stop = new ManualResetEvent(false); | |
191 | |
192 var count = 0; | |
193 for (int i = 0; i < iterations; i++) { | |
194 pool | |
195 .Invoke(() => 1) | |
196 .Then(x => Interlocked.Add(ref count, x)) | |
197 .Then(x => Math.Log10(x)) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
198 .On(() => { |
77 | 199 Interlocked.Decrement(ref pending); |
200 if (pending == 0) | |
201 stop.Set(); | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
202 }, PromiseEventType.All); |
77 | 203 } |
204 | |
205 stop.WaitOne(); | |
206 | |
207 Assert.AreEqual(iterations, count); | |
208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); | |
209 pool.Dispose(); | |
210 | |
211 } | |
212 | |
213 [TestMethod] | |
214 public void WorkerPoolDisposeTest() { | |
215 var pool = new WorkerPool(5, 20); | |
216 Assert.AreEqual(5, pool.PoolSize); | |
217 pool.Dispose(); | |
218 Thread.Sleep(500); | |
219 Assert.AreEqual(0, pool.PoolSize); | |
220 pool.Dispose(); | |
221 } | |
222 | |
223 [TestMethod] | |
224 public void MTQueueTest() { | |
233 | 225 var queue = new SimpleAsyncQueue<int>(); |
77 | 226 int res; |
233 | 227 |
77 | 228 queue.Enqueue(10); |
229 Assert.IsTrue(queue.TryDequeue(out res)); | |
230 Assert.AreEqual(10, res); | |
231 Assert.IsFalse(queue.TryDequeue(out res)); | |
232 | |
233 for (int i = 0; i < 1000; i++) | |
234 queue.Enqueue(i); | |
235 | |
236 for (int i = 0; i < 1000; i++) { | |
237 queue.TryDequeue(out res); | |
238 Assert.AreEqual(i, res); | |
239 } | |
240 | |
241 int writers = 0; | |
242 int readers = 0; | |
243 var stop = new ManualResetEvent(false); | |
244 int total = 0; | |
233 | 245 var ticks = Environment.TickCount; |
77 | 246 |
233 | 247 const int itemsPerWriter = 1000000; |
77 | 248 const int writersCount = 10; |
249 | |
250 for (int i = 0; i < writersCount; i++) { | |
251 Interlocked.Increment(ref writers); | |
252 AsyncPool | |
124 | 253 .RunThread(() => { |
77 | 254 for (int ii = 0; ii < itemsPerWriter; ii++) { |
255 queue.Enqueue(1); | |
256 } | |
257 return 1; | |
258 }) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
259 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); |
77 | 260 } |
261 | |
262 for (int i = 0; i < 10; i++) { | |
263 Interlocked.Increment(ref readers); | |
264 AsyncPool | |
124 | 265 .RunThread(() => { |
77 | 266 int t; |
267 do { | |
268 while (queue.TryDequeue(out t)) | |
269 Interlocked.Add(ref total, t); | |
270 } while (writers > 0); | |
271 return 1; | |
272 }) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
273 .On(() => { |
77 | 274 Interlocked.Decrement(ref readers); |
275 if (readers == 0) | |
276 stop.Set(); | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
277 }, PromiseEventType.All); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
278 } |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
279 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
280 stop.WaitOne(); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
281 |
233 | 282 Console.WriteLine("{0} in {1}ms", total, Environment.TickCount - ticks); |
283 | |
284 Assert.AreEqual(itemsPerWriter * writersCount, total); | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
285 } |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
286 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
287 [TestMethod] |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
288 public void AsyncQueueTest() { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
289 var queue = new AsyncQueue<int>(); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
290 int res; |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
291 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
292 queue.Enqueue(10); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
293 Assert.IsTrue(queue.TryDequeue(out res)); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
294 Assert.AreEqual(10, res); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
295 Assert.IsFalse(queue.TryDequeue(out res)); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
296 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
297 for (int i = 0; i < 1000; i++) |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
298 queue.Enqueue(i); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
299 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
300 for (int i = 0; i < 1000; i++) { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
301 queue.TryDequeue(out res); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
302 Assert.AreEqual(i, res); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
303 } |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
304 |
121 | 305 const int count = 10000000; |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
306 |
121 | 307 int res1 = 0, res2 = 0; |
308 var t1 = Environment.TickCount; | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
309 |
121 | 310 AsyncPool.RunThread( |
311 () => { | |
312 for (var i = 0; i < count; i++) | |
313 queue.Enqueue(1); | |
314 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
315 }, | |
316 () => { | |
317 for (var i = 0; i < count; i++) | |
318 queue.Enqueue(2); | |
319 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
320 }, | |
321 () => { | |
322 int temp; | |
323 int i = 0; | |
324 while (i < count) | |
325 if (queue.TryDequeue(out temp)) { | |
326 i++; | |
327 res1 += temp; | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
328 } |
121 | 329 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); |
330 }, | |
331 () => { | |
332 int temp; | |
333 int i = 0; | |
334 while (i < count) | |
335 if (queue.TryDequeue(out temp)) { | |
336 i++; | |
337 res2 += temp; | |
338 } | |
339 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
340 } | |
341 ) | |
205
8200ab154c8a
Added ResetState to RunnableComponent to reset in case of failure
cin
parents:
151
diff
changeset
|
342 .PromiseAll() |
121 | 343 .Join(); |
344 | |
345 Assert.AreEqual(count * 3, res1 + res2); | |
346 | |
347 Console.WriteLine( | |
348 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
349 Environment.TickCount - t1, | |
350 res1, | |
351 res2, | |
352 res1 + res2, | |
353 count | |
354 ); | |
355 } | |
356 | |
357 [TestMethod] | |
358 public void AsyncQueueBatchTest() { | |
359 var queue = new AsyncQueue<int>(); | |
360 | |
361 const int wBatch = 29; | |
362 const int wCount = 400000; | |
363 const int total = wBatch * wCount * 2; | |
364 const int summ = wBatch * wCount * 3; | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
365 |
121 | 366 int r1 = 0, r2 = 0; |
367 const int rBatch = 111; | |
368 int read = 0; | |
369 | |
370 var t1 = Environment.TickCount; | |
371 | |
372 AsyncPool.RunThread( | |
373 () => { | |
374 var buffer = new int[wBatch]; | |
375 for(int i = 0; i<wBatch; i++) | |
376 buffer[i] = 1; | |
377 | |
378 for(int i =0; i < wCount; i++) | |
379 queue.EnqueueRange(buffer,0,wBatch); | |
380 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
381 }, | |
382 () => { | |
383 var buffer = new int[wBatch]; | |
384 for(int i = 0; i<wBatch; i++) | |
385 buffer[i] = 2; | |
386 | |
387 for(int i =0; i < wCount; i++) | |
388 queue.EnqueueRange(buffer,0,wBatch); | |
389 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
390 }, | |
391 () => { | |
392 var buffer = new int[rBatch]; | |
77 | 393 |
121 | 394 while(read < total) { |
395 int actual; | |
396 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
397 for(int i=0; i< actual; i++) | |
398 r1 += buffer[i]; | |
399 Interlocked.Add(ref read, actual); | |
400 } | |
401 } | |
402 | |
403 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |
404 }, | |
405 () => { | |
406 var buffer = new int[rBatch]; | |
77 | 407 |
121 | 408 while(read < total) { |
409 int actual; | |
410 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
411 for(int i=0; i< actual; i++) | |
412 r2 += buffer[i]; | |
413 Interlocked.Add(ref read, actual); | |
414 } | |
415 } | |
416 | |
417 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
418 } | |
419 ) | |
205
8200ab154c8a
Added ResetState to RunnableComponent to reset in case of failure
cin
parents:
151
diff
changeset
|
420 .PromiseAll() |
121 | 421 .Join(); |
422 | |
423 Assert.AreEqual(summ , r1 + r2); | |
424 | |
425 Console.WriteLine( | |
426 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
427 Environment.TickCount - t1, | |
428 r1, | |
429 r2, | |
430 r1 + r2, | |
431 total | |
432 ); | |
77 | 433 } |
434 | |
435 [TestMethod] | |
122
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
436 public void AsyncQueueChunkDequeueTest() { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
437 var queue = new AsyncQueue<int>(); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
438 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
439 const int wBatch = 31; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
440 const int wCount = 200000; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
441 const int total = wBatch * wCount * 3; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
442 const int summ = wBatch * wCount * 6; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
443 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
444 int r1 = 0, r2 = 0; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
445 const int rBatch = 1024; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
446 int read = 0; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
447 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
448 var t1 = Environment.TickCount; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
449 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
450 AsyncPool.RunThread( |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
451 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
452 var buffer = new int[wBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
453 for(int i = 0; i<wBatch; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
454 buffer[i] = 1; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
455 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
456 for(int i =0; i < wCount; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
457 queue.EnqueueRange(buffer,0,wBatch); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
458 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
459 }, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
460 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
461 var buffer = new int[wBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
462 for(int i = 0; i<wBatch; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
463 buffer[i] = 2; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
464 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
465 for(int i =0; i < wCount; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
466 queue.EnqueueRange(buffer,0,wBatch); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
467 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
468 }, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
469 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
470 var buffer = new int[wBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
471 for(int i = 0; i<wBatch; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
472 buffer[i] = 3; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
473 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
474 for(int i =0; i < wCount; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
475 queue.EnqueueRange(buffer,0,wBatch); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
476 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
477 }, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
478 () => { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
479 var buffer = new int[rBatch]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
480 int count = 1; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
481 double avgchunk = 0; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
482 while(read < total) { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
483 int actual; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
484 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
485 for(int i=0; i< actual; i++) |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
486 r2 += buffer[i]; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
487 Interlocked.Add(ref read, actual); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
488 avgchunk = avgchunk*(count-1)/count + actual/(double)count; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
489 count ++; |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
490 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
491 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
492 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
493 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
494 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
495 ) |
205
8200ab154c8a
Added ResetState to RunnableComponent to reset in case of failure
cin
parents:
151
diff
changeset
|
496 .PromiseAll() |
124 | 497 .Join(); |
498 | |
499 Assert.AreEqual(summ , r1 + r2); | |
500 | |
501 Console.WriteLine( | |
502 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
503 Environment.TickCount - t1, | |
504 r1, | |
505 r2, | |
506 r1 + r2, | |
507 total | |
508 ); | |
509 } | |
510 | |
511 [TestMethod] | |
512 public void AsyncQueueDrainTest() { | |
513 var queue = new AsyncQueue<int>(); | |
514 | |
233 | 515 const int wBatch = 32; |
124 | 516 const int wCount = 200000; |
517 const int total = wBatch * wCount * 3; | |
518 const int summ = wBatch * wCount * 3; | |
519 | |
520 int r1 = 0, r2 = 0; | |
521 int read = 0; | |
522 | |
523 var t1 = Environment.TickCount; | |
524 | |
525 AsyncPool.RunThread( | |
526 () => { | |
527 var buffer = new int[wBatch]; | |
528 for(int i = 0; i<wBatch; i++) | |
529 buffer[i] = 1; | |
530 | |
531 for(int i =0; i < wCount; i++) | |
532 queue.EnqueueRange(buffer,0,wBatch); | |
533 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
534 }, | |
535 () => { | |
233 | 536 var buffer = new int[wBatch]; |
537 for (int i = 0; i < wBatch; i++) | |
538 buffer[i] = 1; | |
539 | |
540 for (int i = 0; i < wCount; i++) | |
541 queue.EnqueueRange(buffer, 0, wBatch); | |
124 | 542 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); |
543 }, | |
544 () => { | |
545 var buffer = new int[wBatch]; | |
546 for(int i = 0; i<wBatch; i++) | |
547 buffer[i] = 1; | |
548 | |
549 for(int i =0; i < wCount; i++) | |
550 queue.EnqueueRange(buffer,0,wBatch); | |
551 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |
552 }, | |
553 /*() => { | |
554 int temp; | |
555 int count = 0; | |
556 while (read < total) | |
557 if (queue.TryDequeue(out temp)) { | |
558 count++; | |
559 r1 += temp; | |
560 Interlocked.Increment(ref read); | |
561 } | |
562 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); | |
563 },*/ | |
564 /*() => { | |
565 var buffer = new int[rBatch]; | |
566 var count = 0; | |
567 while(read < total) { | |
568 int actual; | |
569 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
570 for(int i=0; i< actual; i++) | |
571 r1 += buffer[i]; | |
572 Interlocked.Add(ref read, actual); | |
573 count += actual; | |
574 } | |
575 } | |
576 | |
577 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
578 },*/ | |
579 () => { | |
580 var count = 0; | |
233 | 581 int emptyDrains = 0; |
582 | |
583 while (read < total) { | |
124 | 584 var buffer = queue.Drain(); |
233 | 585 if (buffer.Count == 0) |
586 emptyDrains++; | |
587 for(int i=0; i< buffer.Count; i++) | |
124 | 588 r1 += buffer[i]; |
233 | 589 Interlocked.Add(ref read, buffer.Count); |
590 count += buffer.Count; | |
124 | 591 } |
233 | 592 Console.WriteLine("done reader #1: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains); |
124 | 593 }, |
594 () => { | |
233 | 595 var count = 0; |
596 int emptyDrains = 0; | |
597 | |
598 while (read < total) { | |
124 | 599 var buffer = queue.Drain(); |
233 | 600 if (buffer.Count == 0) |
601 emptyDrains++; | |
602 | |
603 for (int i=0; i< buffer.Count; i++) | |
124 | 604 r2 += buffer[i]; |
233 | 605 Interlocked.Add(ref read, buffer.Count); |
606 count += buffer.Count; | |
124 | 607 } |
233 | 608 Console.WriteLine("done reader #2: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains); |
124 | 609 } |
610 ) | |
205
8200ab154c8a
Added ResetState to RunnableComponent to reset in case of failure
cin
parents:
151
diff
changeset
|
611 .PromiseAll() |
122
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
612 .Join(); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
613 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
614 Assert.AreEqual(summ , r1 + r2); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
615 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
616 Console.WriteLine( |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
617 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
618 Environment.TickCount - t1, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
619 r1, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
620 r2, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
621 r1 + r2, |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
622 total |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
623 ); |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
624 } |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
625 |
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
626 [TestMethod] |
77 | 627 public void ParallelMapTest() { |
628 | |
629 const int count = 100000; | |
630 | |
631 var args = new double[count]; | |
632 var rand = new Random(); | |
633 | |
634 for (int i = 0; i < count; i++) | |
635 args[i] = rand.NextDouble(); | |
636 | |
637 var t = Environment.TickCount; | |
638 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); | |
639 | |
640 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
641 | |
642 t = Environment.TickCount; | |
643 for (int i = 0; i < count; i++) | |
644 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
645 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
646 } | |
647 | |
648 [TestMethod] | |
649 public void ChainedMapTest() { | |
650 | |
125 | 651 using (var pool = new WorkerPool()) { |
77 | 652 const int count = 10000; |
653 | |
654 var args = new double[count]; | |
655 var rand = new Random(); | |
656 | |
657 for (int i = 0; i < count; i++) | |
658 args[i] = rand.NextDouble(); | |
659 | |
660 var t = Environment.TickCount; | |
661 var res = args | |
662 .ChainedMap( | |
663 // Analysis disable once AccessToDisposedClosure | |
664 x => pool.Invoke( | |
665 () => Math.Sin(x * x) | |
666 ), | |
667 4 | |
668 ) | |
669 .Join(); | |
670 | |
671 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
672 | |
673 t = Environment.TickCount; | |
674 for (int i = 0; i < count; i++) | |
675 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
676 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
677 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | |
678 } | |
679 } | |
680 | |
681 [TestMethod] | |
682 public void ParallelForEachTest() { | |
683 | |
684 const int count = 100000; | |
685 | |
686 var args = new int[count]; | |
687 var rand = new Random(); | |
688 | |
689 for (int i = 0; i < count; i++) | |
690 args[i] = (int)(rand.NextDouble() * 100); | |
691 | |
692 int result = 0; | |
693 | |
694 var t = Environment.TickCount; | |
695 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); | |
696 | |
697 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); | |
698 | |
699 int result2 = 0; | |
700 | |
701 t = Environment.TickCount; | |
702 for (int i = 0; i < count; i++) | |
703 result2 += args[i]; | |
704 Assert.AreEqual(result2, result); | |
705 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
706 } | |
707 | |
708 [TestMethod] | |
709 public void ComplexCase1Test() { | |
710 var flags = new bool[3]; | |
711 | |
712 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) | |
713 | |
714 var step1 = PromiseHelper | |
715 .Sleep(200, "Alan") | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
716 .On(() => flags[0] = true, PromiseEventType.Cancelled); |
77 | 717 var p = step1 |
718 .Chain(x => | |
719 PromiseHelper | |
720 .Sleep(200, "Hi, " + x) | |
721 .Then(y => y) | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
722 .On(() => flags[1] = true, PromiseEventType.Cancelled) |
77 | 723 ) |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
724 .On(() => flags[2] = true, PromiseEventType.Cancelled); |
77 | 725 step1.Join(); |
726 p.Cancel(); | |
727 try { | |
728 Assert.AreEqual(p.Join(), "Hi, Alan"); | |
729 Assert.Fail("Shouldn't get here"); | |
730 } catch (OperationCanceledException) { | |
731 } | |
732 | |
733 Assert.IsFalse(flags[0]); | |
734 Assert.IsTrue(flags[1]); | |
735 Assert.IsTrue(flags[2]); | |
736 } | |
737 | |
738 [TestMethod] | |
739 public void ChainedCancel1Test() { | |
740 // при отмене сцепленной асинхронной операции все обещание должно | |
741 // завершаться ошибкой OperationCanceledException | |
742 var p = PromiseHelper | |
743 .Sleep(1, "Hi, HAL!") | |
744 .Then(x => { | |
745 // запускаем две асинхронные операции | |
746 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); | |
747 // вторая операция отменяет первую до завершения | |
748 PromiseHelper | |
749 .Sleep(100, "HAL, STOP!") | |
750 .Then(result.Cancel); | |
751 return result; | |
752 }); | |
753 try { | |
754 p.Join(); | |
755 } catch (TargetInvocationException err) { | |
756 Assert.IsTrue(err.InnerException is OperationCanceledException); | |
757 } | |
758 } | |
759 | |
760 [TestMethod] | |
761 public void ChainedCancel2Test() { | |
762 // при отмене цепочки обещаний, вложенные операции также должны отменяться | |
763 var pSurvive = new Promise<bool>(); | |
149 | 764 var hemStarted = new Signal(); |
77 | 765 var p = PromiseHelper |
766 .Sleep(1, "Hi, HAL!") | |
149 | 767 .Chain(() => { |
77 | 768 hemStarted.Set(); |
769 // запускаем две асинхронные операции | |
770 var result = PromiseHelper | |
149 | 771 .Sleep(2000, "HEM ENABLED!!!") |
772 .Then(() => pSurvive.Resolve(false)); | |
77 | 773 |
774 result | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
775 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled); |
149 | 776 |
77 | 777 return result; |
778 }); | |
779 | |
149 | 780 hemStarted.Wait(); |
77 | 781 p.Cancel(); |
782 | |
783 try { | |
784 p.Join(); | |
149 | 785 Assert.Fail(); |
77 | 786 } catch (OperationCanceledException) { |
787 } | |
149 | 788 Assert.IsTrue(pSurvive.Join()); |
77 | 789 } |
136 | 790 |
791 [TestMethod] | |
792 public void SharedLockTest() { | |
793 var l = new SharedLock(); | |
794 int shared = 0; | |
795 int exclusive = 0; | |
796 var s1 = new Signal(); | |
797 var log = new AsyncQueue<string>(); | |
798 | |
799 try { | |
800 AsyncPool.RunThread( | |
801 () => { | |
802 log.Enqueue("Reader #1 started"); | |
803 try { | |
804 l.LockShared(); | |
805 log.Enqueue("Reader #1 lock got"); | |
806 if (Interlocked.Increment(ref shared) == 2) | |
807 s1.Set(); | |
808 s1.Wait(); | |
809 log.Enqueue("Reader #1 finished"); | |
810 Interlocked.Decrement(ref shared); | |
811 } finally { | |
812 l.Release(); | |
813 log.Enqueue("Reader #1 lock released"); | |
814 } | |
815 }, | |
816 () => { | |
817 log.Enqueue("Reader #2 started"); | |
818 | |
819 try { | |
820 l.LockShared(); | |
821 log.Enqueue("Reader #2 lock got"); | |
822 | |
823 if (Interlocked.Increment(ref shared) == 2) | |
824 s1.Set(); | |
825 s1.Wait(); | |
826 log.Enqueue("Reader #2 upgrading to writer"); | |
827 Interlocked.Decrement(ref shared); | |
828 l.Upgrade(); | |
829 log.Enqueue("Reader #2 upgraded"); | |
830 | |
831 Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |
832 Assert.AreEqual(0, shared); | |
833 log.Enqueue("Reader #2 finished"); | |
834 Interlocked.Decrement(ref exclusive); | |
835 } finally { | |
836 l.Release(); | |
837 log.Enqueue("Reader #2 lock released"); | |
838 } | |
839 }, | |
840 () => { | |
841 log.Enqueue("Writer #1 started"); | |
842 try { | |
843 l.LockExclusive(); | |
844 log.Enqueue("Writer #1 got the lock"); | |
845 Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |
846 Interlocked.Decrement(ref exclusive); | |
847 log.Enqueue("Writer #1 is finished"); | |
848 } finally { | |
849 l.Release(); | |
850 log.Enqueue("Writer #1 lock released"); | |
851 } | |
852 } | |
205
8200ab154c8a
Added ResetState to RunnableComponent to reset in case of failure
cin
parents:
151
diff
changeset
|
853 ).PromiseAll().Join(1000); |
136 | 854 log.Enqueue("Done"); |
855 } catch(Exception error) { | |
856 log.Enqueue(error.Message); | |
857 throw; | |
858 } finally { | |
859 foreach (var m in log) | |
860 Console.WriteLine(m); | |
861 } | |
862 } | |
151 | 863 |
864 #if NET_4_5 | |
865 | |
866 [TestMethod] | |
867 public async void TaskInteropTest() { | |
868 var promise = new Promise<int>(); | |
869 promise.Resolve(10); | |
870 var res = await promise; | |
871 | |
872 Assert.AreEqual(10, res); | |
873 } | |
874 | |
875 #endif | |
77 | 876 } |
877 } | |
878 |