comparison Implab.Test/AsyncTests.cs @ 124:a336cb13c6a9 v2

major update, added Drain mathod to AsyncQueue class
author cin
date Thu, 15 Jan 2015 02:43:14 +0300
parents 0c8685c8b56b
children f803565868a4
comparison
equal deleted inserted replaced
123:f4d6ea6969cc 124:a336cb13c6a9
247 const int writersCount = 10; 247 const int writersCount = 10;
248 248
249 for (int i = 0; i < writersCount; i++) { 249 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers); 250 Interlocked.Increment(ref writers);
251 AsyncPool 251 AsyncPool
252 .InvokeNewThread(() => { 252 .RunThread(() => {
253 for (int ii = 0; ii < itemsPerWriter; ii++) { 253 for (int ii = 0; ii < itemsPerWriter; ii++) {
254 queue.Enqueue(1); 254 queue.Enqueue(1);
255 } 255 }
256 return 1; 256 return 1;
257 }) 257 })
259 } 259 }
260 260
261 for (int i = 0; i < 10; i++) { 261 for (int i = 0; i < 10; i++) {
262 Interlocked.Increment(ref readers); 262 Interlocked.Increment(ref readers);
263 AsyncPool 263 AsyncPool
264 .InvokeNewThread(() => { 264 .RunThread(() => {
265 int t; 265 int t;
266 do { 266 do {
267 while (queue.TryDequeue(out t)) 267 while (queue.TryDequeue(out t))
268 Interlocked.Add(ref total, t); 268 Interlocked.Add(ref total, t);
269 } while (writers > 0); 269 } while (writers > 0);
334 res2 += temp; 334 res2 += temp;
335 } 335 }
336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
337 } 337 }
338 ) 338 )
339 .Combine() 339 .Bundle()
340 .Join(); 340 .Join();
341 341
342 Assert.AreEqual(count * 3, res1 + res2); 342 Assert.AreEqual(count * 3, res1 + res2);
343 343
344 Console.WriteLine( 344 Console.WriteLine(
412 } 412 }
413 413
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1);
415 } 415 }
416 ) 416 )
417 .Combine() 417 .Bundle()
418 .Join(); 418 .Join();
419 419
420 Assert.AreEqual(summ , r1 + r2); 420 Assert.AreEqual(summ , r1 + r2);
421 421
422 Console.WriteLine( 422 Console.WriteLine(
488 } 488 }
489 489
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); 490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk);
491 } 491 }
492 ) 492 )
493 .Combine() 493 .Bundle()
494 .Join();
495
496 Assert.AreEqual(summ , r1 + r2);
497
498 Console.WriteLine(
499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}",
500 Environment.TickCount - t1,
501 r1,
502 r2,
503 r1 + r2,
504 total
505 );
506 }
507
508 [TestMethod]
509 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>();
511
512 const int wBatch = 11;
513 const int wCount = 200000;
514 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3;
516
517 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0;
520
521 var t1 = Environment.TickCount;
522
523 AsyncPool.RunThread(
524 () => {
525 var buffer = new int[wBatch];
526 for(int i = 0; i<wBatch; i++)
527 buffer[i] = 1;
528
529 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 },
533 () => {
534 for(int i =0; i < wCount * wBatch; i++)
535 queue.Enqueue(1);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 },
538 () => {
539 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++)
541 buffer[i] = 1;
542
543 for(int i =0; i < wCount; i++)
544 queue.EnqueueRange(buffer,0,wBatch);
545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1);
546 },
547 /*() => {
548 int temp;
549 int count = 0;
550 while (read < total)
551 if (queue.TryDequeue(out temp)) {
552 count++;
553 r1 += temp;
554 Interlocked.Increment(ref read);
555 }
556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count);
557 },*/
558 /*() => {
559 var buffer = new int[rBatch];
560 var count = 0;
561 while(read < total) {
562 int actual;
563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) {
564 for(int i=0; i< actual; i++)
565 r1 += buffer[i];
566 Interlocked.Add(ref read, actual);
567 count += actual;
568 }
569 }
570
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/
573 () => {
574 var count = 0;
575 while(read < total) {
576 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++)
578 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length);
580 count += buffer.Length;
581 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
583 },
584 () => {
585 var count = 0;
586 while(read < total) {
587 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++)
589 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length);
591 count += buffer.Length;
592 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count);
594 }
595 )
596 .Bundle()
494 .Join(); 597 .Join();
495 598
496 Assert.AreEqual(summ , r1 + r2); 599 Assert.AreEqual(summ , r1 + r2);
497 600
498 Console.WriteLine( 601 Console.WriteLine(