Mercurial > pub > ImplabNet
comparison Implab.Test/AsyncTests.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
| author | cin |
|---|---|
| date | Sun, 11 Jan 2015 19:13:02 +0300 |
| parents | d4e38929ce36 |
| children | 62d2f1e98c4e |
comparison
equal
deleted
inserted
replaced
| 118:e046a94eecb1 | 119:2573b562e328 |
|---|---|
| 70 public void ContinueOnCancelTest() { | 70 public void ContinueOnCancelTest() { |
| 71 var p = new Promise<bool>(); | 71 var p = new Promise<bool>(); |
| 72 p.Cancel(); | 72 p.Cancel(); |
| 73 | 73 |
| 74 var p2 = p | 74 var p2 = p |
| 75 .Cancelled(() => { | 75 .Cancelled<bool>(() => { |
| 76 throw new ApplicationException("CANCELLED"); | 76 throw new ApplicationException("CANCELLED"); |
| 77 }) | 77 }) |
| 78 .Error(e => true); | 78 .Error(e => true); |
| 79 | 79 |
| 80 Assert.AreEqual(true, p2.Join()); | 80 Assert.AreEqual(true, p2.Join()); |
| 193 for (int i = 0; i < iterations; i++) { | 193 for (int i = 0; i < iterations; i++) { |
| 194 pool | 194 pool |
| 195 .Invoke(() => 1) | 195 .Invoke(() => 1) |
| 196 .Then(x => Interlocked.Add(ref count, x)) | 196 .Then(x => Interlocked.Add(ref count, x)) |
| 197 .Then(x => Math.Log10(x)) | 197 .Then(x => Math.Log10(x)) |
| 198 .Anyway(() => { | 198 .On(() => { |
| 199 Interlocked.Decrement(ref pending); | 199 Interlocked.Decrement(ref pending); |
| 200 if (pending == 0) | 200 if (pending == 0) |
| 201 stop.Set(); | 201 stop.Set(); |
| 202 }); | 202 }, PromiseEventType.All); |
| 203 } | 203 } |
| 204 | 204 |
| 205 stop.WaitOne(); | 205 stop.WaitOne(); |
| 206 | 206 |
| 207 Assert.AreEqual(iterations, count); | 207 Assert.AreEqual(iterations, count); |
| 253 for (int ii = 0; ii < itemsPerWriter; ii++) { | 253 for (int ii = 0; ii < itemsPerWriter; ii++) { |
| 254 queue.Enqueue(1); | 254 queue.Enqueue(1); |
| 255 } | 255 } |
| 256 return 1; | 256 return 1; |
| 257 }) | 257 }) |
| 258 .Anyway(() => Interlocked.Decrement(ref writers)); | 258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); |
| 259 } | 259 } |
| 260 | 260 |
| 261 for (int i = 0; i < 10; i++) { | 261 for (int i = 0; i < 10; i++) { |
| 262 Interlocked.Increment(ref readers); | 262 Interlocked.Increment(ref readers); |
| 263 AsyncPool | 263 AsyncPool |
| 267 while (queue.TryDequeue(out t)) | 267 while (queue.TryDequeue(out t)) |
| 268 Interlocked.Add(ref total, t); | 268 Interlocked.Add(ref total, t); |
| 269 } while (writers > 0); | 269 } while (writers > 0); |
| 270 return 1; | 270 return 1; |
| 271 }) | 271 }) |
| 272 .Anyway(() => { | 272 .On(() => { |
| 273 Interlocked.Decrement(ref readers); | 273 Interlocked.Decrement(ref readers); |
| 274 if (readers == 0) | 274 if (readers == 0) |
| 275 stop.Set(); | 275 stop.Set(); |
| 276 }); | 276 }, PromiseEventType.All); |
| 277 } | |
| 278 | |
| 279 stop.WaitOne(); | |
| 280 | |
| 281 Assert.AreEqual(100000, total); | |
| 282 } | |
| 283 | |
| 284 [TestMethod] | |
| 285 public void AsyncQueueTest() { | |
| 286 var queue = new AsyncQueue<int>(); | |
| 287 int res; | |
| 288 | |
| 289 queue.Enqueue(10); | |
| 290 Assert.IsTrue(queue.TryDequeue(out res)); | |
| 291 Assert.AreEqual(10, res); | |
| 292 Assert.IsFalse(queue.TryDequeue(out res)); | |
| 293 | |
| 294 for (int i = 0; i < 1000; i++) | |
| 295 queue.Enqueue(i); | |
| 296 | |
| 297 for (int i = 0; i < 1000; i++) { | |
| 298 queue.TryDequeue(out res); | |
| 299 Assert.AreEqual(i, res); | |
| 300 } | |
| 301 | |
| 302 int writers = 0; | |
| 303 int readers = 0; | |
| 304 var stop = new ManualResetEvent(false); | |
| 305 int total = 0; | |
| 306 | |
| 307 const int itemsPerWriter = 10000; | |
| 308 const int writersCount = 10; | |
| 309 | |
| 310 for (int i = 0; i < writersCount; i++) { | |
| 311 Interlocked.Increment(ref writers); | |
| 312 AsyncPool | |
| 313 .InvokeNewThread(() => { | |
| 314 for (int ii = 0; ii < itemsPerWriter; ii++) { | |
| 315 queue.Enqueue(1); | |
| 316 } | |
| 317 return 1; | |
| 318 }) | |
| 319 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); | |
| 320 } | |
| 321 | |
| 322 for (int i = 0; i < 10; i++) { | |
| 323 Interlocked.Increment(ref readers); | |
| 324 AsyncPool | |
| 325 .InvokeNewThread(() => { | |
| 326 int t; | |
| 327 do { | |
| 328 while (queue.TryDequeue(out t)) | |
| 329 Interlocked.Add(ref total, t); | |
| 330 } while (writers > 0); | |
| 331 return 1; | |
| 332 }) | |
| 333 .On(() => { | |
| 334 Interlocked.Decrement(ref readers); | |
| 335 if (readers == 0) | |
| 336 stop.Set(); | |
| 337 }, PromiseEventType.All); | |
| 277 } | 338 } |
| 278 | 339 |
| 279 stop.WaitOne(); | 340 stop.WaitOne(); |
| 280 | 341 |
| 281 Assert.AreEqual(itemsPerWriter * writersCount, total); | 342 Assert.AreEqual(itemsPerWriter * writersCount, total); |
| 369 | 430 |
| 370 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) | 431 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) |
| 371 | 432 |
| 372 var step1 = PromiseHelper | 433 var step1 = PromiseHelper |
| 373 .Sleep(200, "Alan") | 434 .Sleep(200, "Alan") |
| 374 .Cancelled(() => flags[0] = true); | 435 .On(() => flags[0] = true, PromiseEventType.Cancelled); |
| 375 var p = step1 | 436 var p = step1 |
| 376 .Chain(x => | 437 .Chain(x => |
| 377 PromiseHelper | 438 PromiseHelper |
| 378 .Sleep(200, "Hi, " + x) | 439 .Sleep(200, "Hi, " + x) |
| 379 .Then(y => y) | 440 .Then(y => y) |
| 380 .Cancelled(() => flags[1] = true) | 441 .On(() => flags[1] = true, PromiseEventType.Cancelled) |
| 381 ) | 442 ) |
| 382 .Cancelled(() => flags[2] = true); | 443 .On(() => flags[2] = true, PromiseEventType.Cancelled); |
| 383 step1.Join(); | 444 step1.Join(); |
| 384 p.Cancel(); | 445 p.Cancel(); |
| 385 try { | 446 try { |
| 386 Assert.AreEqual(p.Join(), "Hi, Alan"); | 447 Assert.AreEqual(p.Join(), "Hi, Alan"); |
| 387 Assert.Fail("Shouldn't get here"); | 448 Assert.Fail("Shouldn't get here"); |
