Mercurial > pub > ImplabNet
comparison Implab.Test/AsyncTests.cs @ 121:62d2f1e98c4e v2
working version of AsyncQueue and batch operations
tests
author | cin |
---|---|
date | Mon, 12 Jan 2015 18:19:41 +0300 |
parents | 2573b562e328 |
children | 0c8685c8b56b |
comparison
equal
deleted
inserted
replaced
120:f1b897999260 | 121:62d2f1e98c4e |
---|---|
297 for (int i = 0; i < 1000; i++) { | 297 for (int i = 0; i < 1000; i++) { |
298 queue.TryDequeue(out res); | 298 queue.TryDequeue(out res); |
299 Assert.AreEqual(i, res); | 299 Assert.AreEqual(i, res); |
300 } | 300 } |
301 | 301 |
302 int writers = 0; | 302 const int count = 10000000; |
303 int readers = 0; | 303 |
304 var stop = new ManualResetEvent(false); | 304 int res1 = 0, res2 = 0; |
305 int total = 0; | 305 var t1 = Environment.TickCount; |
306 | 306 |
307 const int itemsPerWriter = 10000; | 307 AsyncPool.RunThread( |
308 const int writersCount = 10; | 308 () => { |
309 | 309 for (var i = 0; i < count; i++) |
310 for (int i = 0; i < writersCount; i++) { | 310 queue.Enqueue(1); |
311 Interlocked.Increment(ref writers); | 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); |
312 AsyncPool | 312 }, |
313 .InvokeNewThread(() => { | 313 () => { |
314 for (int ii = 0; ii < itemsPerWriter; ii++) { | 314 for (var i = 0; i < count; i++) |
315 queue.Enqueue(1); | 315 queue.Enqueue(2); |
316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
317 }, | |
318 () => { | |
319 int temp; | |
320 int i = 0; | |
321 while (i < count) | |
322 if (queue.TryDequeue(out temp)) { | |
323 i++; | |
324 res1 += temp; | |
316 } | 325 } |
317 return 1; | 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); |
318 }) | 327 }, |
319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); | 328 () => { |
320 } | 329 int temp; |
321 | 330 int i = 0; |
322 for (int i = 0; i < 10; i++) { | 331 while (i < count) |
323 Interlocked.Increment(ref readers); | 332 if (queue.TryDequeue(out temp)) { |
324 AsyncPool | 333 i++; |
325 .InvokeNewThread(() => { | 334 res2 += temp; |
326 int t; | 335 } |
327 do { | 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); |
328 while (queue.TryDequeue(out t)) | 337 } |
329 Interlocked.Add(ref total, t); | 338 ) |
330 } while (writers > 0); | 339 .Combine() |
331 return 1; | 340 .Join(); |
332 }) | 341 |
333 .On(() => { | 342 Assert.AreEqual(count * 3, res1 + res2); |
334 Interlocked.Decrement(ref readers); | 343 |
335 if (readers == 0) | 344 Console.WriteLine( |
336 stop.Set(); | 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", |
337 }, PromiseEventType.All); | 346 Environment.TickCount - t1, |
338 } | 347 res1, |
339 | 348 res2, |
340 stop.WaitOne(); | 349 res1 + res2, |
341 | 350 count |
342 Assert.AreEqual(itemsPerWriter * writersCount, total); | 351 ); |
352 } | |
353 | |
354 [TestMethod] | |
355 public void AsyncQueueBatchTest() { | |
356 var queue = new AsyncQueue<int>(); | |
357 | |
358 const int wBatch = 29; | |
359 const int wCount = 400000; | |
360 const int total = wBatch * wCount * 2; | |
361 const int summ = wBatch * wCount * 3; | |
362 | |
363 int r1 = 0, r2 = 0; | |
364 const int rBatch = 111; | |
365 int read = 0; | |
366 | |
367 var t1 = Environment.TickCount; | |
368 | |
369 AsyncPool.RunThread( | |
370 () => { | |
371 var buffer = new int[wBatch]; | |
372 for(int i = 0; i<wBatch; i++) | |
373 buffer[i] = 1; | |
374 | |
375 for(int i =0; i < wCount; i++) | |
376 queue.EnqueueRange(buffer,0,wBatch); | |
377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
378 }, | |
379 () => { | |
380 var buffer = new int[wBatch]; | |
381 for(int i = 0; i<wBatch; i++) | |
382 buffer[i] = 2; | |
383 | |
384 for(int i =0; i < wCount; i++) | |
385 queue.EnqueueRange(buffer,0,wBatch); | |
386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
387 }, | |
388 () => { | |
389 var buffer = new int[rBatch]; | |
390 | |
391 while(read < total) { | |
392 int actual; | |
393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
394 for(int i=0; i< actual; i++) | |
395 r1 += buffer[i]; | |
396 Interlocked.Add(ref read, actual); | |
397 } | |
398 } | |
399 | |
400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |
401 }, | |
402 () => { | |
403 var buffer = new int[rBatch]; | |
404 | |
405 while(read < total) { | |
406 int actual; | |
407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
408 for(int i=0; i< actual; i++) | |
409 r2 += buffer[i]; | |
410 Interlocked.Add(ref read, actual); | |
411 } | |
412 } | |
413 | |
414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
415 } | |
416 ) | |
417 .Combine() | |
418 .Join(); | |
419 | |
420 Assert.AreEqual(summ , r1 + r2); | |
421 | |
422 Console.WriteLine( | |
423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
424 Environment.TickCount - t1, | |
425 r1, | |
426 r2, | |
427 r1 + r2, | |
428 total | |
429 ); | |
343 } | 430 } |
344 | 431 |
345 [TestMethod] | 432 [TestMethod] |
346 public void ParallelMapTest() { | 433 public void ParallelMapTest() { |
347 | 434 |