Mercurial > pub > ImplabNet
comparison Implab.Test/AsyncTests.cs @ 121:62d2f1e98c4e v2
working version of AsyncQueue and batch operations
tests
| author | cin |
|---|---|
| date | Mon, 12 Jan 2015 18:19:41 +0300 |
| parents | 2573b562e328 |
| children | 0c8685c8b56b |
comparison
equal
deleted
inserted
replaced
| 120:f1b897999260 | 121:62d2f1e98c4e |
|---|---|
| 297 for (int i = 0; i < 1000; i++) { | 297 for (int i = 0; i < 1000; i++) { |
| 298 queue.TryDequeue(out res); | 298 queue.TryDequeue(out res); |
| 299 Assert.AreEqual(i, res); | 299 Assert.AreEqual(i, res); |
| 300 } | 300 } |
| 301 | 301 |
| 302 int writers = 0; | 302 const int count = 10000000; |
| 303 int readers = 0; | 303 |
| 304 var stop = new ManualResetEvent(false); | 304 int res1 = 0, res2 = 0; |
| 305 int total = 0; | 305 var t1 = Environment.TickCount; |
| 306 | 306 |
| 307 const int itemsPerWriter = 10000; | 307 AsyncPool.RunThread( |
| 308 const int writersCount = 10; | 308 () => { |
| 309 | 309 for (var i = 0; i < count; i++) |
| 310 for (int i = 0; i < writersCount; i++) { | 310 queue.Enqueue(1); |
| 311 Interlocked.Increment(ref writers); | 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); |
| 312 AsyncPool | 312 }, |
| 313 .InvokeNewThread(() => { | 313 () => { |
| 314 for (int ii = 0; ii < itemsPerWriter; ii++) { | 314 for (var i = 0; i < count; i++) |
| 315 queue.Enqueue(1); | 315 queue.Enqueue(2); |
| 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
| 317 }, | |
| 318 () => { | |
| 319 int temp; | |
| 320 int i = 0; | |
| 321 while (i < count) | |
| 322 if (queue.TryDequeue(out temp)) { | |
| 323 i++; | |
| 324 res1 += temp; | |
| 316 } | 325 } |
| 317 return 1; | 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); |
| 318 }) | 327 }, |
| 319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); | 328 () => { |
| 320 } | 329 int temp; |
| 321 | 330 int i = 0; |
| 322 for (int i = 0; i < 10; i++) { | 331 while (i < count) |
| 323 Interlocked.Increment(ref readers); | 332 if (queue.TryDequeue(out temp)) { |
| 324 AsyncPool | 333 i++; |
| 325 .InvokeNewThread(() => { | 334 res2 += temp; |
| 326 int t; | 335 } |
| 327 do { | 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
| 328 while (queue.TryDequeue(out t)) | 337 } |
| 329 Interlocked.Add(ref total, t); | 338 ) |
| 330 } while (writers > 0); | 339 .Combine() |
| 331 return 1; | 340 .Join(); |
| 332 }) | 341 |
| 333 .On(() => { | 342 Assert.AreEqual(count * 3, res1 + res2); |
| 334 Interlocked.Decrement(ref readers); | 343 |
| 335 if (readers == 0) | 344 Console.WriteLine( |
| 336 stop.Set(); | 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", |
| 337 }, PromiseEventType.All); | 346 Environment.TickCount - t1, |
| 338 } | 347 res1, |
| 339 | 348 res2, |
| 340 stop.WaitOne(); | 349 res1 + res2, |
| 341 | 350 count |
| 342 Assert.AreEqual(itemsPerWriter * writersCount, total); | 351 ); |
| 352 } | |
| 353 | |
| 354 [TestMethod] | |
| 355 public void AsyncQueueBatchTest() { | |
| 356 var queue = new AsyncQueue<int>(); | |
| 357 | |
| 358 const int wBatch = 29; | |
| 359 const int wCount = 400000; | |
| 360 const int total = wBatch * wCount * 2; | |
| 361 const int summ = wBatch * wCount * 3; | |
| 362 | |
| 363 int r1 = 0, r2 = 0; | |
| 364 const int rBatch = 111; | |
| 365 int read = 0; | |
| 366 | |
| 367 var t1 = Environment.TickCount; | |
| 368 | |
| 369 AsyncPool.RunThread( | |
| 370 () => { | |
| 371 var buffer = new int[wBatch]; | |
| 372 for(int i = 0; i<wBatch; i++) | |
| 373 buffer[i] = 1; | |
| 374 | |
| 375 for(int i =0; i < wCount; i++) | |
| 376 queue.EnqueueRange(buffer,0,wBatch); | |
| 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
| 378 }, | |
| 379 () => { | |
| 380 var buffer = new int[wBatch]; | |
| 381 for(int i = 0; i<wBatch; i++) | |
| 382 buffer[i] = 2; | |
| 383 | |
| 384 for(int i =0; i < wCount; i++) | |
| 385 queue.EnqueueRange(buffer,0,wBatch); | |
| 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
| 387 }, | |
| 388 () => { | |
| 389 var buffer = new int[rBatch]; | |
| 390 | |
| 391 while(read < total) { | |
| 392 int actual; | |
| 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
| 394 for(int i=0; i< actual; i++) | |
| 395 r1 += buffer[i]; | |
| 396 Interlocked.Add(ref read, actual); | |
| 397 } | |
| 398 } | |
| 399 | |
| 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |
| 401 }, | |
| 402 () => { | |
| 403 var buffer = new int[rBatch]; | |
| 404 | |
| 405 while(read < total) { | |
| 406 int actual; | |
| 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
| 408 for(int i=0; i< actual; i++) | |
| 409 r2 += buffer[i]; | |
| 410 Interlocked.Add(ref read, actual); | |
| 411 } | |
| 412 } | |
| 413 | |
| 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
| 415 } | |
| 416 ) | |
| 417 .Combine() | |
| 418 .Join(); | |
| 419 | |
| 420 Assert.AreEqual(summ , r1 + r2); | |
| 421 | |
| 422 Console.WriteLine( | |
| 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
| 424 Environment.TickCount - t1, | |
| 425 r1, | |
| 426 r2, | |
| 427 r1 + r2, | |
| 428 total | |
| 429 ); | |
| 343 } | 430 } |
| 344 | 431 |
| 345 [TestMethod] | 432 [TestMethod] |
| 346 public void ParallelMapTest() { | 433 public void ParallelMapTest() { |
| 347 | 434 |
