Mercurial > pub > ImplabNet
comparison Implab.Test/AsyncTests.cs @ 124:a336cb13c6a9 v2
major update, added Drain mathod to AsyncQueue class
author | cin |
---|---|
date | Thu, 15 Jan 2015 02:43:14 +0300 |
parents | 0c8685c8b56b |
children | f803565868a4 |
comparison
equal
deleted
inserted
replaced
123:f4d6ea6969cc | 124:a336cb13c6a9 |
---|---|
247 const int writersCount = 10; | 247 const int writersCount = 10; |
248 | 248 |
249 for (int i = 0; i < writersCount; i++) { | 249 for (int i = 0; i < writersCount; i++) { |
250 Interlocked.Increment(ref writers); | 250 Interlocked.Increment(ref writers); |
251 AsyncPool | 251 AsyncPool |
252 .InvokeNewThread(() => { | 252 .RunThread(() => { |
253 for (int ii = 0; ii < itemsPerWriter; ii++) { | 253 for (int ii = 0; ii < itemsPerWriter; ii++) { |
254 queue.Enqueue(1); | 254 queue.Enqueue(1); |
255 } | 255 } |
256 return 1; | 256 return 1; |
257 }) | 257 }) |
259 } | 259 } |
260 | 260 |
261 for (int i = 0; i < 10; i++) { | 261 for (int i = 0; i < 10; i++) { |
262 Interlocked.Increment(ref readers); | 262 Interlocked.Increment(ref readers); |
263 AsyncPool | 263 AsyncPool |
264 .InvokeNewThread(() => { | 264 .RunThread(() => { |
265 int t; | 265 int t; |
266 do { | 266 do { |
267 while (queue.TryDequeue(out t)) | 267 while (queue.TryDequeue(out t)) |
268 Interlocked.Add(ref total, t); | 268 Interlocked.Add(ref total, t); |
269 } while (writers > 0); | 269 } while (writers > 0); |
334 res2 += temp; | 334 res2 += temp; |
335 } | 335 } |
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
337 } | 337 } |
338 ) | 338 ) |
339 .Combine() | 339 .Bundle() |
340 .Join(); | 340 .Join(); |
341 | 341 |
342 Assert.AreEqual(count * 3, res1 + res2); | 342 Assert.AreEqual(count * 3, res1 + res2); |
343 | 343 |
344 Console.WriteLine( | 344 Console.WriteLine( |
412 } | 412 } |
413 | 413 |
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
415 } | 415 } |
416 ) | 416 ) |
417 .Combine() | 417 .Bundle() |
418 .Join(); | 418 .Join(); |
419 | 419 |
420 Assert.AreEqual(summ , r1 + r2); | 420 Assert.AreEqual(summ , r1 + r2); |
421 | 421 |
422 Console.WriteLine( | 422 Console.WriteLine( |
488 } | 488 } |
489 | 489 |
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); | 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); |
491 } | 491 } |
492 ) | 492 ) |
493 .Combine() | 493 .Bundle() |
494 .Join(); | |
495 | |
496 Assert.AreEqual(summ , r1 + r2); | |
497 | |
498 Console.WriteLine( | |
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
500 Environment.TickCount - t1, | |
501 r1, | |
502 r2, | |
503 r1 + r2, | |
504 total | |
505 ); | |
506 } | |
507 | |
508 [TestMethod] | |
509 public void AsyncQueueDrainTest() { | |
510 var queue = new AsyncQueue<int>(); | |
511 | |
512 const int wBatch = 11; | |
513 const int wCount = 200000; | |
514 const int total = wBatch * wCount * 3; | |
515 const int summ = wBatch * wCount * 3; | |
516 | |
517 int r1 = 0, r2 = 0; | |
518 const int rBatch = 11; | |
519 int read = 0; | |
520 | |
521 var t1 = Environment.TickCount; | |
522 | |
523 AsyncPool.RunThread( | |
524 () => { | |
525 var buffer = new int[wBatch]; | |
526 for(int i = 0; i<wBatch; i++) | |
527 buffer[i] = 1; | |
528 | |
529 for(int i =0; i < wCount; i++) | |
530 queue.EnqueueRange(buffer,0,wBatch); | |
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
532 }, | |
533 () => { | |
534 for(int i =0; i < wCount * wBatch; i++) | |
535 queue.Enqueue(1); | |
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
537 }, | |
538 () => { | |
539 var buffer = new int[wBatch]; | |
540 for(int i = 0; i<wBatch; i++) | |
541 buffer[i] = 1; | |
542 | |
543 for(int i =0; i < wCount; i++) | |
544 queue.EnqueueRange(buffer,0,wBatch); | |
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |
546 }, | |
547 /*() => { | |
548 int temp; | |
549 int count = 0; | |
550 while (read < total) | |
551 if (queue.TryDequeue(out temp)) { | |
552 count++; | |
553 r1 += temp; | |
554 Interlocked.Increment(ref read); | |
555 } | |
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); | |
557 },*/ | |
558 /*() => { | |
559 var buffer = new int[rBatch]; | |
560 var count = 0; | |
561 while(read < total) { | |
562 int actual; | |
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
564 for(int i=0; i< actual; i++) | |
565 r1 += buffer[i]; | |
566 Interlocked.Add(ref read, actual); | |
567 count += actual; | |
568 } | |
569 } | |
570 | |
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
572 },*/ | |
573 () => { | |
574 var count = 0; | |
575 while(read < total) { | |
576 var buffer = queue.Drain(); | |
577 for(int i=0; i< buffer.Length; i++) | |
578 r1 += buffer[i]; | |
579 Interlocked.Add(ref read, buffer.Length); | |
580 count += buffer.Length; | |
581 } | |
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
583 }, | |
584 () => { | |
585 var count = 0; | |
586 while(read < total) { | |
587 var buffer = queue.Drain(); | |
588 for(int i=0; i< buffer.Length; i++) | |
589 r2 += buffer[i]; | |
590 Interlocked.Add(ref read, buffer.Length); | |
591 count += buffer.Length; | |
592 } | |
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); | |
594 } | |
595 ) | |
596 .Bundle() | |
494 .Join(); | 597 .Join(); |
495 | 598 |
496 Assert.AreEqual(summ , r1 + r2); | 599 Assert.AreEqual(summ , r1 + r2); |
497 | 600 |
498 Console.WriteLine( | 601 Console.WriteLine( |