comparison Implab.Test/AsyncTests.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents ec91a6dfa5b3
children 8200ab154c8a
comparison
equal deleted inserted replaced
71:1714fd8678ef 192:f1da3afc3521
1 using System; 1 using System;
2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using System.Reflection; 2 using System.Reflection;
4 using System.Threading; 3 using System.Threading;
5 using Implab.Parallels; 4 using Implab.Parallels;
5
6 #if MONO
7
8 using NUnit.Framework;
9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute;
10 using TestMethodAttribute = NUnit.Framework.TestAttribute;
11
12 #else
13
14 using Microsoft.VisualStudio.TestTools.UnitTesting;
15
16 #endif
6 17
7 namespace Implab.Test { 18 namespace Implab.Test {
8 [TestClass] 19 [TestClass]
9 public class AsyncTests { 20 public class AsyncTests {
10 [TestMethod] 21 [TestMethod]
21 public void RejectTest() { 32 public void RejectTest() {
22 int res = -1; 33 int res = -1;
23 Exception err = null; 34 Exception err = null;
24 35
25 var p = new Promise<int>(); 36 var p = new Promise<int>();
26 p.Then(x => res = x, e => err = e); 37 p.Then(
38 x => res = x,
39 e => {
40 err = e;
41 return -2;
42 }
43 );
27 p.Reject(new ApplicationException("error")); 44 p.Reject(new ApplicationException("error"));
28 45
29 Assert.AreEqual(res, -1); 46 Assert.AreEqual(res, -1);
30 Assert.AreEqual(err.Message, "error"); 47 Assert.AreEqual(err.Message, "error");
31 48
49 }
50
51 [TestMethod]
52 public void CancelExceptionTest() {
53 var p = new Promise<bool>();
54 p.CancelOperation(null);
55
56 var p2 = p.Then(x => x, null, reason => {
57 throw new ApplicationException("CANCELLED");
58 });
59
60 try {
61 p2.Join();
62 Assert.Fail();
63 } catch (ApplicationException err) {
64 Assert.AreEqual("CANCELLED", err.InnerException.Message);
65 }
66
67 }
68
69 [TestMethod]
70 public void ContinueOnCancelTest() {
71 var p = new Promise<bool>();
72 p.CancelOperation(null);
73
74 var p2 = p
75 .Then(x => x, null, reason => {
76 throw new ApplicationException("CANCELLED");
77 })
78 .Then(x => x, e => true);
79
80 Assert.AreEqual(true, p2.Join());
32 } 81 }
33 82
34 [TestMethod] 83 [TestMethod]
35 public void JoinSuccessTest() { 84 public void JoinSuccessTest() {
36 var p = new Promise<int>(); 85 var p = new Promise<int>();
55 104
56 [TestMethod] 105 [TestMethod]
57 public void MapTest() { 106 public void MapTest() {
58 var p = new Promise<int>(); 107 var p = new Promise<int>();
59 108
60 var p2 = p.Map(x => x.ToString()); 109 var p2 = p.Then(x => x.ToString());
61 p.Resolve(100); 110 p.Resolve(100);
62 111
63 Assert.AreEqual(p2.Join(), "100"); 112 Assert.AreEqual(p2.Join(), "100");
64 } 113 }
65 114
66 [TestMethod] 115 [TestMethod]
67 public void FixErrorTest() { 116 public void FixErrorTest() {
68 var p = new Promise<int>(); 117 var p = new Promise<int>();
69 118
70 var p2 = p.Error(e => 101); 119 var p2 = p.Then(x => x, e => 101);
71 120
72 p.Reject(new Exception()); 121 p.Reject(new Exception());
73 122
74 Assert.AreEqual(p2.Join(), 101); 123 Assert.AreEqual(p2.Join(), 101);
75 } 124 }
88 137
89 Assert.AreEqual(p3.Join(), "100"); 138 Assert.AreEqual(p3.Join(), "100");
90 } 139 }
91 140
92 [TestMethod] 141 [TestMethod]
142 public void ChainFailTest() {
143 var p1 = new Promise<int>();
144
145 var p3 = p1.Chain(x => {
146 var p2 = new Promise<string>();
147 p2.Reject(new Exception("DIE!!!"));
148 return p2;
149 });
150
151 p1.Resolve(100);
152
153 Assert.IsTrue(p3.IsResolved);
154 }
155
156 [TestMethod]
93 public void PoolTest() { 157 public void PoolTest() {
94 var pid = Thread.CurrentThread.ManagedThreadId; 158 var pid = Thread.CurrentThread.ManagedThreadId;
95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
96 160
97 Assert.AreNotEqual(pid, p.Join()); 161 Assert.AreNotEqual(pid, p.Join());
98 } 162 }
99 163
100 [TestMethod] 164 [TestMethod]
101 public void WorkerPoolSizeTest() { 165 public void WorkerPoolSizeTest() {
102 var pool = new WorkerPool(5, 10, 0); 166 var pool = new WorkerPool(5, 10, 1);
103 167
104 Assert.AreEqual(5, pool.PoolSize); 168 Assert.AreEqual(5, pool.PoolSize);
105 169
106 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
107 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; });
119 183
120 [TestMethod] 184 [TestMethod]
121 public void WorkerPoolCorrectTest() { 185 public void WorkerPoolCorrectTest() {
122 var pool = new WorkerPool(0,1000,100); 186 var pool = new WorkerPool(0,1000,100);
123 187
124 int iterations = 1000; 188 const int iterations = 1000;
125 int pending = iterations; 189 int pending = iterations;
126 var stop = new ManualResetEvent(false); 190 var stop = new ManualResetEvent(false);
127 191
128 var count = 0; 192 var count = 0;
129 for (int i = 0; i < iterations; i++) { 193 for (int i = 0; i < iterations; i++) {
130 pool 194 pool
131 .Invoke(() => 1) 195 .Invoke(() => 1)
132 .Then(x => Interlocked.Add(ref count, x)) 196 .Then(x => Interlocked.Add(ref count, x))
133 .Then(x => Math.Log10(x)) 197 .Then(x => Math.Log10(x))
134 .Anyway(() => { 198 .On(() => {
135 Interlocked.Decrement(ref pending); 199 Interlocked.Decrement(ref pending);
136 if (pending == 0) 200 if (pending == 0)
137 stop.Set(); 201 stop.Set();
138 }); 202 }, PromiseEventType.All);
139 } 203 }
140 204
141 stop.WaitOne(); 205 stop.WaitOne();
142 206
143 Assert.AreEqual(iterations, count); 207 Assert.AreEqual(iterations, count);
177 int writers = 0; 241 int writers = 0;
178 int readers = 0; 242 int readers = 0;
179 var stop = new ManualResetEvent(false); 243 var stop = new ManualResetEvent(false);
180 int total = 0; 244 int total = 0;
181 245
182 int itemsPerWriter = 1000; 246 const int itemsPerWriter = 10000;
183 int writersCount = 3; 247 const int writersCount = 10;
184 248
185 for (int i = 0; i < writersCount; i++) { 249 for (int i = 0; i < writersCount; i++) {
186 Interlocked.Increment(ref writers); 250 Interlocked.Increment(ref writers);
187 var wn = i;
188 AsyncPool 251 AsyncPool
189 .InvokeNewThread(() => { 252 .RunThread(() => {
190 for (int ii = 0; ii < itemsPerWriter; ii++) { 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
191 queue.Enqueue(1); 254 queue.Enqueue(1);
192 } 255 }
193 return 1; 256 return 1;
194 }) 257 })
195 .Anyway(() => Interlocked.Decrement(ref writers)); 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All);
196 } 259 }
197 260
198 for (int i = 0; i < 10; i++) { 261 for (int i = 0; i < 10; i++) {
199 Interlocked.Increment(ref readers); 262 Interlocked.Increment(ref readers);
200 var wn = i;
201 AsyncPool 263 AsyncPool
202 .InvokeNewThread(() => { 264 .RunThread(() => {
203 int t; 265 int t;
204 do { 266 do {
205 while (queue.TryDequeue(out t)) 267 while (queue.TryDequeue(out t))
206 Interlocked.Add(ref total, t); 268 Interlocked.Add(ref total, t);
207 } while (writers > 0); 269 } while (writers > 0);
208 return 1; 270 return 1;
209 }) 271 })
210 .Anyway(() => { 272 .On(() => {
211 Interlocked.Decrement(ref readers); 273 Interlocked.Decrement(ref readers);
212 if (readers == 0) 274 if (readers == 0)
213 stop.Set(); 275 stop.Set();
214 }); 276 }, PromiseEventType.All);
215 } 277 }
216 278
217 stop.WaitOne(); 279 stop.WaitOne();
218 280
219 Assert.AreEqual(itemsPerWriter * writersCount, total); 281 Assert.AreEqual(100000, total);
282 }
283
284 [TestMethod]
285 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>();
287 int res;
288
289 queue.Enqueue(10);
290 Assert.IsTrue(queue.TryDequeue(out res));
291 Assert.AreEqual(10, res);
292 Assert.IsFalse(queue.TryDequeue(out res));
293
294 for (int i = 0; i < 1000; i++)
295 queue.Enqueue(i);
296
297 for (int i = 0; i < 1000; i++) {
298 queue.TryDequeue(out res);
299 Assert.AreEqual(i, res);
300 }
301
302 const int count = 10000000;
303
304 int res1 = 0, res2 = 0;
305 var t1 = Environment.TickCount;
306
307 AsyncPool.RunThread(
308 () => {
309 for (var i = 0; i < count; i++)
310 queue.Enqueue(1);
311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
312 },
313 () => {
314 for (var i = 0; i < count; i++)
315 queue.Enqueue(2);
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
317 },
318 () => {
319 int temp;
320 int i = 0;
321 while (i < count)
322 if (queue.TryDequeue(out temp)) {
323 i++;
324 res1 += temp;
325 }
326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
327 },
328 () => {
329 int temp;
330 int i = 0;
331 while (i < count)
332 if (queue.TryDequeue(out temp)) {
333 i++;
334 res2 += temp;
335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 }
338 )
339 .Bundle()
340 .Join();
341
342 Assert.AreEqual(count * 3, res1 + res2);
343
344 Console.WriteLine(
345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
346 Environment.TickCount - t1,
347 res1,
348 res2,
349 res1 + res2,
350 count
351 );
352 }
353
354 [TestMethod]
355 public void AsyncQueueBatchTest() {
356 var queue = new AsyncQueue<int>();
357
358 const int wBatch = 29;
359 const int wCount = 400000;
360 const int total = wBatch * wCount * 2;
361 const int summ = wBatch * wCount * 3;
362
363 int r1 = 0, r2 = 0;
364 const int rBatch = 111;
365 int read = 0;
366
367 var t1 = Environment.TickCount;
368
369 AsyncPool.RunThread(
370 () => {
371 var buffer = new int[wBatch];
372 for(int i = 0; i<wBatch; i++)
373 buffer[i] = 1;
374
375 for(int i =0; i < wCount; i++)
376 queue.EnqueueRange(buffer,0,wBatch);
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
378 },
379 () => {
380 var buffer = new int[wBatch];
381 for(int i = 0; i<wBatch; i++)
382 buffer[i] = 2;
383
384 for(int i =0; i < wCount; i++)
385 queue.EnqueueRange(buffer,0,wBatch);
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
387 },
388 () => {
389 var buffer = new int[rBatch];
390
391 while(read < total) {
392 int actual;
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
394 for(int i=0; i< actual; i++)
395 r1 += buffer[i];
396 Interlocked.Add(ref read, actual);
397 }
398 }
399
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1);
401 },
402 () => {
403 var buffer = new int[rBatch];
404
405 while(read < total) {
406 int actual;
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
408 for(int i=0; i< actual; i++)
409 r2 += buffer[i];
410 Interlocked.Add(ref read, actual);
411 }
412 }
413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 }
416 )
417 .Bundle()
418 .Join();
419
420 Assert.AreEqual(summ , r1 + r2);
421
422 Console.WriteLine(
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
424 Environment.TickCount - t1,
425 r1,
426 r2,
427 r1 + r2,
428 total
429 );
430 }
431
432 [TestMethod]
433 public void AsyncQueueChunkDequeueTest() {
434 var queue = new AsyncQueue<int>();
435
436 const int wBatch = 31;
437 const int wCount = 200000;
438 const int total = wBatch * wCount * 3;
439 const int summ = wBatch * wCount * 6;
440
441 int r1 = 0, r2 = 0;
442 const int rBatch = 1024;
443 int read = 0;
444
445 var t1 = Environment.TickCount;
446
447 AsyncPool.RunThread(
448 () => {
449 var buffer = new int[wBatch];
450 for(int i = 0; i<wBatch; i++)
451 buffer[i] = 1;
452
453 for(int i =0; i < wCount; i++)
454 queue.EnqueueRange(buffer,0,wBatch);
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
456 },
457 () => {
458 var buffer = new int[wBatch];
459 for(int i = 0; i<wBatch; i++)
460 buffer[i] = 2;
461
462 for(int i =0; i < wCount; i++)
463 queue.EnqueueRange(buffer,0,wBatch);
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
465 },
466 () => {
467 var buffer = new int[wBatch];
468 for(int i = 0; i<wBatch; i++)
469 buffer[i] = 3;
470
471 for(int i =0; i < wCount; i++)
472 queue.EnqueueRange(buffer,0,wBatch);
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
474 },
475 () => {
476 var buffer = new int[rBatch];
477 int count = 1;
478 double avgchunk = 0;
479 while(read < total) {
480 int actual;
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) {
482 for(int i=0; i< actual; i++)
483 r2 += buffer[i];
484 Interlocked.Add(ref read, actual);
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count;
486 count ++;
487 }
488 }
489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 }
492 )
493 .Bundle()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
511
512 const int wBatch = 11;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0;
520
521 var t1 = Environment.TickCount;
522
523 AsyncPool.RunThread(
524 () => {
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
528
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
538 () => {
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
542
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
547 /*() => {
548 int temp;
549 int count = 0;
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
552 count++;
553 r1 += temp;
554 Interlocked.Increment(ref read);
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
558 /*() => {
559 var buffer = new int[rBatch];
560 var count = 0;
561 while(read < total) {
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
567 count += actual;
568 }
569 }
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
573 () => {
574 var count = 0;
575 while(read < total) {
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
584 () => {
585 var count = 0;
586 while(read < total) {
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
595 )
596 .Bundle()
597 .Join();
598
599 Assert.AreEqual(summ , r1 + r2);
600
601 Console.WriteLine(
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
603 Environment.TickCount - t1,
604 r1,
605 r2,
606 r1 + r2,
607 total
608 );
220 } 609 }
221 610
222 [TestMethod] 611 [TestMethod]
223 public void ParallelMapTest() { 612 public void ParallelMapTest() {
224 613
225 int count = 100000; 614 const int count = 100000;
226 615
227 double[] args = new double[count]; 616 var args = new double[count];
228 var rand = new Random(); 617 var rand = new Random();
229 618
230 for (int i = 0; i < count; i++) 619 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble(); 620 args[i] = rand.NextDouble();
232 621
242 } 631 }
243 632
244 [TestMethod] 633 [TestMethod]
245 public void ChainedMapTest() { 634 public void ChainedMapTest() {
246 635
247 using (var pool = new WorkerPool(0,100,100)) { 636 using (var pool = new WorkerPool()) {
248 int count = 10000; 637 const int count = 10000;
249 638
250 double[] args = new double[count]; 639 var args = new double[count];
251 var rand = new Random(); 640 var rand = new Random();
252 641
253 for (int i = 0; i < count; i++) 642 for (int i = 0; i < count; i++)
254 args[i] = rand.NextDouble(); 643 args[i] = rand.NextDouble();
255 644
256 var t = Environment.TickCount; 645 var t = Environment.TickCount;
257 var res = args 646 var res = args
258 .ChainedMap( 647 .ChainedMap(
648 // Analysis disable once AccessToDisposedClosure
259 x => pool.Invoke( 649 x => pool.Invoke(
260 () => Math.Sin(x * x) 650 () => Math.Sin(x * x)
261 ), 651 ),
262 4 652 4
263 ) 653 )
274 } 664 }
275 665
276 [TestMethod] 666 [TestMethod]
277 public void ParallelForEachTest() { 667 public void ParallelForEachTest() {
278 668
279 int count = 100000; 669 const int count = 100000;
280 670
281 int[] args = new int[count]; 671 var args = new int[count];
282 var rand = new Random(); 672 var rand = new Random();
283 673
284 for (int i = 0; i < count; i++) 674 for (int i = 0; i < count; i++)
285 args[i] = (int)(rand.NextDouble() * 100); 675 args[i] = (int)(rand.NextDouble() * 100);
286 676
304 public void ComplexCase1Test() { 694 public void ComplexCase1Test() {
305 var flags = new bool[3]; 695 var flags = new bool[3];
306 696
307 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) 697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map)
308 698
309 var p = PromiseHelper 699 var step1 = PromiseHelper
310 .Sleep(200, "Alan") 700 .Sleep(200, "Alan")
311 .Cancelled(() => flags[0] = true) 701 .On(() => flags[0] = true, PromiseEventType.Cancelled);
702 var p = step1
312 .Chain(x => 703 .Chain(x =>
313 PromiseHelper 704 PromiseHelper
314 .Sleep(200, "Hi, " + x) 705 .Sleep(200, "Hi, " + x)
315 .Map(y => y) 706 .Then(y => y)
316 .Cancelled(() => flags[1] = true) 707 .On(() => flags[1] = true, PromiseEventType.Cancelled)
317 ) 708 )
318 .Cancelled(() => flags[2] = true); 709 .On(() => flags[2] = true, PromiseEventType.Cancelled);
319 Thread.Sleep(300); 710 step1.Join();
320 p.Cancel(); 711 p.Cancel();
321 try { 712 try {
322 Assert.AreEqual(p.Join(), "Hi, Alan"); 713 Assert.AreEqual(p.Join(), "Hi, Alan");
323 Assert.Fail("Shouldn't get here"); 714 Assert.Fail("Shouldn't get here");
324 } catch (OperationCanceledException) { 715 } catch (OperationCanceledException) {
329 Assert.IsTrue(flags[2]); 720 Assert.IsTrue(flags[2]);
330 } 721 }
331 722
332 [TestMethod] 723 [TestMethod]
333 public void ChainedCancel1Test() { 724 public void ChainedCancel1Test() {
334 // 725 // при отмене сцепленной асинхронной операции все обещание должно
335 // OperationCanceledException 726 // завершаться ошибкой OperationCanceledException
336 var p = PromiseHelper 727 var p = PromiseHelper
337 .Sleep(1, "Hi, HAL!") 728 .Sleep(1, "Hi, HAL!")
338 .Chain(x => { 729 .Then(x => {
339 // 730 // запускаем две асинхронные операции
340 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); 731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!");
341 // 732 // вторая операция отменяет первую до завершения
342 PromiseHelper 733 PromiseHelper
343 .Sleep(100, "HAL, STOP!") 734 .Sleep(100, "HAL, STOP!")
344 .Then(() => result.Cancel()); 735 .Then(result.Cancel);
345 return result; 736 return result;
346 }); 737 });
347 try { 738 try {
348 p.Join(); 739 p.Join();
349 } catch (TargetInvocationException err) { 740 } catch (TargetInvocationException err) {
351 } 742 }
352 } 743 }
353 744
354 [TestMethod] 745 [TestMethod]
355 public void ChainedCancel2Test() { 746 public void ChainedCancel2Test() {
356 // , 747 // при отмене цепочки обещаний, вложенные операции также должны отменяться
357 IPromiseBase p = null;
358 var pSurvive = new Promise<bool>(); 748 var pSurvive = new Promise<bool>();
359 var hemStarted = new ManualResetEvent(false); 749 var hemStarted = new Signal();
360 p = PromiseHelper 750 var p = PromiseHelper
361 .Sleep(1, "Hi, HAL!") 751 .Sleep(1, "Hi, HAL!")
362 .Chain(x => { 752 .Chain(() => {
363 hemStarted.Set(); 753 hemStarted.Set();
364 // 754 // запускаем две асинхронные операции
365 var result = PromiseHelper 755 var result = PromiseHelper
366 .Sleep(1000, "HEM ENABLED!!!") 756 .Sleep(2000, "HEM ENABLED!!!")
367 .Then(s => pSurvive.Resolve(false)); 757 .Then(() => pSurvive.Resolve(false));
368 758
369 result 759 result
370 .Cancelled(() => pSurvive.Resolve(true)); 760 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled);
371 761
372 return result; 762 return result;
373 }); 763 });
374 764
375 hemStarted.WaitOne(); 765 hemStarted.Wait();
376 p.Cancel(); 766 p.Cancel();
377 767
378 try { 768 try {
379 p.Join(); 769 p.Join();
770 Assert.Fail();
380 } catch (OperationCanceledException) { 771 } catch (OperationCanceledException) {
381 Assert.IsTrue(pSurvive.Join()); 772 }
382 } 773 Assert.IsTrue(pSurvive.Join());
383 } 774 }
775
776 [TestMethod]
777 public void SharedLockTest() {
778 var l = new SharedLock();
779 int shared = 0;
780 int exclusive = 0;
781 var s1 = new Signal();
782 var log = new AsyncQueue<string>();
783
784 try {
785 AsyncPool.RunThread(
786 () => {
787 log.Enqueue("Reader #1 started");
788 try {
789 l.LockShared();
790 log.Enqueue("Reader #1 lock got");
791 if (Interlocked.Increment(ref shared) == 2)
792 s1.Set();
793 s1.Wait();
794 log.Enqueue("Reader #1 finished");
795 Interlocked.Decrement(ref shared);
796 } finally {
797 l.Release();
798 log.Enqueue("Reader #1 lock released");
799 }
800 },
801 () => {
802 log.Enqueue("Reader #2 started");
803
804 try {
805 l.LockShared();
806 log.Enqueue("Reader #2 lock got");
807
808 if (Interlocked.Increment(ref shared) == 2)
809 s1.Set();
810 s1.Wait();
811 log.Enqueue("Reader #2 upgrading to writer");
812 Interlocked.Decrement(ref shared);
813 l.Upgrade();
814 log.Enqueue("Reader #2 upgraded");
815
816 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
817 Assert.AreEqual(0, shared);
818 log.Enqueue("Reader #2 finished");
819 Interlocked.Decrement(ref exclusive);
820 } finally {
821 l.Release();
822 log.Enqueue("Reader #2 lock released");
823 }
824 },
825 () => {
826 log.Enqueue("Writer #1 started");
827 try {
828 l.LockExclusive();
829 log.Enqueue("Writer #1 got the lock");
830 Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
831 Interlocked.Decrement(ref exclusive);
832 log.Enqueue("Writer #1 is finished");
833 } finally {
834 l.Release();
835 log.Enqueue("Writer #1 lock released");
836 }
837 }
838 ).Bundle().Join(1000);
839 log.Enqueue("Done");
840 } catch(Exception error) {
841 log.Enqueue(error.Message);
842 throw;
843 } finally {
844 foreach (var m in log)
845 Console.WriteLine(m);
846 }
847 }
848
849 #if NET_4_5
850
851 [TestMethod]
852 public async void TaskInteropTest() {
853 var promise = new Promise<int>();
854 promise.Resolve(10);
855 var res = await promise;
856
857 Assert.AreEqual(10, res);
858 }
859
860 #endif
384 } 861 }
385 } 862 }
386 863