comparison Implab.Test/AsyncTests.cs @ 233:d6fe09f5592c v2

Improved AsyncQueue Removed ImplabFx
author cin
date Wed, 04 Oct 2017 15:44:47 +0300
parents 8200ab154c8a
children
comparison
equal deleted inserted replaced
229:5f7a3e1d32b9 233:d6fe09f5592c
220 pool.Dispose(); 220 pool.Dispose();
221 } 221 }
222 222
223 [TestMethod] 223 [TestMethod]
224 public void MTQueueTest() { 224 public void MTQueueTest() {
225 var queue = new MTQueue<int>(); 225 var queue = new SimpleAsyncQueue<int>();
226 int res; 226 int res;
227 227
228 queue.Enqueue(10); 228 queue.Enqueue(10);
229 Assert.IsTrue(queue.TryDequeue(out res)); 229 Assert.IsTrue(queue.TryDequeue(out res));
230 Assert.AreEqual(10, res); 230 Assert.AreEqual(10, res);
231 Assert.IsFalse(queue.TryDequeue(out res)); 231 Assert.IsFalse(queue.TryDequeue(out res));
232 232
240 240
241 int writers = 0; 241 int writers = 0;
242 int readers = 0; 242 int readers = 0;
243 var stop = new ManualResetEvent(false); 243 var stop = new ManualResetEvent(false);
244 int total = 0; 244 int total = 0;
245 245 var ticks = Environment.TickCount;
246 const int itemsPerWriter = 10000; 246
247 const int itemsPerWriter = 1000000;
247 const int writersCount = 10; 248 const int writersCount = 10;
248 249
249 for (int i = 0; i < writersCount; i++) { 250 for (int i = 0; i < writersCount; i++) {
250 Interlocked.Increment(ref writers); 251 Interlocked.Increment(ref writers);
251 AsyncPool 252 AsyncPool
276 }, PromiseEventType.All); 277 }, PromiseEventType.All);
277 } 278 }
278 279
279 stop.WaitOne(); 280 stop.WaitOne();
280 281
281 Assert.AreEqual(100000, total); 282 Console.WriteLine("{0} in {1}ms", total, Environment.TickCount - ticks);
283
284 Assert.AreEqual(itemsPerWriter * writersCount, total);
282 } 285 }
283 286
284 [TestMethod] 287 [TestMethod]
285 public void AsyncQueueTest() { 288 public void AsyncQueueTest() {
286 var queue = new AsyncQueue<int>(); 289 var queue = new AsyncQueue<int>();
507 510
508 [TestMethod] 511 [TestMethod]
509 public void AsyncQueueDrainTest() { 512 public void AsyncQueueDrainTest() {
510 var queue = new AsyncQueue<int>(); 513 var queue = new AsyncQueue<int>();
511 514
512 const int wBatch = 11; 515 const int wBatch = 32;
513 const int wCount = 200000; 516 const int wCount = 200000;
514 const int total = wBatch * wCount * 3; 517 const int total = wBatch * wCount * 3;
515 const int summ = wBatch * wCount * 3; 518 const int summ = wBatch * wCount * 3;
516 519
517 int r1 = 0, r2 = 0; 520 int r1 = 0, r2 = 0;
518 const int rBatch = 11;
519 int read = 0; 521 int read = 0;
520 522
521 var t1 = Environment.TickCount; 523 var t1 = Environment.TickCount;
522 524
523 AsyncPool.RunThread( 525 AsyncPool.RunThread(
529 for(int i =0; i < wCount; i++) 531 for(int i =0; i < wCount; i++)
530 queue.EnqueueRange(buffer,0,wBatch); 532 queue.EnqueueRange(buffer,0,wBatch);
531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); 533 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1);
532 }, 534 },
533 () => { 535 () => {
534 for(int i =0; i < wCount * wBatch; i++) 536 var buffer = new int[wBatch];
535 queue.Enqueue(1); 537 for (int i = 0; i < wBatch; i++)
538 buffer[i] = 1;
539
540 for (int i = 0; i < wCount; i++)
541 queue.EnqueueRange(buffer, 0, wBatch);
536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); 542 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1);
537 }, 543 },
538 () => { 544 () => {
539 var buffer = new int[wBatch]; 545 var buffer = new int[wBatch];
540 for(int i = 0; i<wBatch; i++) 546 for(int i = 0; i<wBatch; i++)
570 576
571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); 577 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count);
572 },*/ 578 },*/
573 () => { 579 () => {
574 var count = 0; 580 var count = 0;
575 while(read < total) { 581 int emptyDrains = 0;
582
583 while (read < total) {
576 var buffer = queue.Drain(); 584 var buffer = queue.Drain();
577 for(int i=0; i< buffer.Length; i++) 585 if (buffer.Count == 0)
586 emptyDrains++;
587 for(int i=0; i< buffer.Count; i++)
578 r1 += buffer[i]; 588 r1 += buffer[i];
579 Interlocked.Add(ref read, buffer.Length); 589 Interlocked.Add(ref read, buffer.Count);
580 count += buffer.Length; 590 count += buffer.Count;
581 } 591 }
582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); 592 Console.WriteLine("done reader #1: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
583 }, 593 },
584 () => { 594 () => {
585 var count = 0; 595 var count = 0;
586 while(read < total) { 596 int emptyDrains = 0;
597
598 while (read < total) {
587 var buffer = queue.Drain(); 599 var buffer = queue.Drain();
588 for(int i=0; i< buffer.Length; i++) 600 if (buffer.Count == 0)
601 emptyDrains++;
602
603 for (int i=0; i< buffer.Count; i++)
589 r2 += buffer[i]; 604 r2 += buffer[i];
590 Interlocked.Add(ref read, buffer.Length); 605 Interlocked.Add(ref read, buffer.Count);
591 count += buffer.Length; 606 count += buffer.Count;
592 } 607 }
593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); 608 Console.WriteLine("done reader #2: {0} ms, {1} items, empty: {2}", Environment.TickCount - t1, count, emptyDrains);
594 } 609 }
595 ) 610 )
596 .PromiseAll() 611 .PromiseAll()
597 .Join(); 612 .Join();
598 613