Mercurial > pub > ImplabNet
annotate Implab.Test/AsyncTests.cs @ 178:d5c5db0335ee ref20160224
working on JSON parser
| author | cin |
|---|---|
| date | Wed, 23 Mar 2016 19:51:45 +0300 |
| parents | ec91a6dfa5b3 |
| children | 8200ab154c8a |
| rev | line source |
|---|---|
| 77 | 1 using System; |
| 2 using System.Reflection; | |
| 3 using System.Threading; | |
| 4 using Implab.Parallels; | |
| 5 | |
| 6 #if MONO | |
| 7 | |
| 8 using NUnit.Framework; | |
| 9 using TestClassAttribute = NUnit.Framework.TestFixtureAttribute; | |
| 145 | 10 using TestMethodAttribute = NUnit.Framework.TestAttribute; |
| 77 | 11 |
| 12 #else | |
| 13 | |
| 14 using Microsoft.VisualStudio.TestTools.UnitTesting; | |
| 15 | |
| 16 #endif | |
| 17 | |
| 18 namespace Implab.Test { | |
| 19 [TestClass] | |
| 20 public class AsyncTests { | |
| 21 [TestMethod] | |
| 22 public void ResolveTest() { | |
| 23 int res = -1; | |
| 24 var p = new Promise<int>(); | |
| 25 p.Then(x => res = x); | |
| 26 p.Resolve(100); | |
| 27 | |
| 28 Assert.AreEqual(100, res); | |
| 29 } | |
| 30 | |
| 31 [TestMethod] | |
| 32 public void RejectTest() { | |
| 33 int res = -1; | |
| 34 Exception err = null; | |
| 35 | |
| 36 var p = new Promise<int>(); | |
| 37 p.Then( | |
| 38 x => res = x, | |
| 39 e => { | |
| 40 err = e; | |
| 41 return -2; | |
| 42 } | |
| 43 ); | |
| 44 p.Reject(new ApplicationException("error")); | |
| 45 | |
| 46 Assert.AreEqual(res, -1); | |
| 47 Assert.AreEqual(err.Message, "error"); | |
| 48 | |
| 49 } | |
| 50 | |
| 51 [TestMethod] | |
| 52 public void CancelExceptionTest() { | |
| 53 var p = new Promise<bool>(); | |
| 145 | 54 p.CancelOperation(null); |
| 77 | 55 |
|
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
56 var p2 = p.Then(x => x, null, reason => { |
| 77 | 57 throw new ApplicationException("CANCELLED"); |
| 58 }); | |
| 59 | |
| 60 try { | |
| 61 p2.Join(); | |
| 62 Assert.Fail(); | |
| 63 } catch (ApplicationException err) { | |
| 64 Assert.AreEqual("CANCELLED", err.InnerException.Message); | |
| 65 } | |
| 66 | |
| 67 } | |
| 68 | |
| 69 [TestMethod] | |
| 70 public void ContinueOnCancelTest() { | |
| 71 var p = new Promise<bool>(); | |
| 145 | 72 p.CancelOperation(null); |
| 77 | 73 |
| 74 var p2 = p | |
| 145 | 75 .Then(x => x, null, reason => { |
| 77 | 76 throw new ApplicationException("CANCELLED"); |
| 77 }) | |
|
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
78 .Then(x => x, e => true); |
| 77 | 79 |
| 80 Assert.AreEqual(true, p2.Join()); | |
| 81 } | |
| 82 | |
| 83 [TestMethod] | |
| 84 public void JoinSuccessTest() { | |
| 85 var p = new Promise<int>(); | |
| 86 p.Resolve(100); | |
| 87 Assert.AreEqual(p.Join(), 100); | |
| 88 } | |
| 89 | |
| 90 [TestMethod] | |
| 91 public void JoinFailTest() { | |
| 92 var p = new Promise<int>(); | |
| 93 p.Reject(new ApplicationException("failed")); | |
| 94 | |
| 95 try { | |
| 96 p.Join(); | |
| 97 throw new ApplicationException("WRONG!"); | |
| 98 } catch (TargetInvocationException err) { | |
| 99 Assert.AreEqual(err.InnerException.Message, "failed"); | |
| 100 } catch { | |
| 101 Assert.Fail("Got wrong excaption"); | |
| 102 } | |
| 103 } | |
| 104 | |
| 105 [TestMethod] | |
| 106 public void MapTest() { | |
| 107 var p = new Promise<int>(); | |
| 108 | |
| 109 var p2 = p.Then(x => x.ToString()); | |
| 110 p.Resolve(100); | |
| 111 | |
| 112 Assert.AreEqual(p2.Join(), "100"); | |
| 113 } | |
| 114 | |
| 115 [TestMethod] | |
| 116 public void FixErrorTest() { | |
| 117 var p = new Promise<int>(); | |
| 118 | |
|
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
119 var p2 = p.Then(x => x, e => 101); |
| 77 | 120 |
| 121 p.Reject(new Exception()); | |
| 122 | |
| 123 Assert.AreEqual(p2.Join(), 101); | |
| 124 } | |
| 125 | |
| 126 [TestMethod] | |
| 127 public void ChainTest() { | |
| 128 var p1 = new Promise<int>(); | |
| 129 | |
| 130 var p3 = p1.Chain(x => { | |
| 131 var p2 = new Promise<string>(); | |
| 132 p2.Resolve(x.ToString()); | |
| 133 return p2; | |
| 134 }); | |
| 135 | |
| 136 p1.Resolve(100); | |
| 137 | |
| 138 Assert.AreEqual(p3.Join(), "100"); | |
| 139 } | |
| 140 | |
| 141 [TestMethod] | |
| 105 | 142 public void ChainFailTest() { |
| 143 var p1 = new Promise<int>(); | |
| 144 | |
| 145 var p3 = p1.Chain(x => { | |
| 146 var p2 = new Promise<string>(); | |
| 147 p2.Reject(new Exception("DIE!!!")); | |
| 148 return p2; | |
| 149 }); | |
| 150 | |
| 151 p1.Resolve(100); | |
| 152 | |
| 153 Assert.IsTrue(p3.IsResolved); | |
| 154 } | |
| 155 | |
| 156 [TestMethod] | |
| 77 | 157 public void PoolTest() { |
| 158 var pid = Thread.CurrentThread.ManagedThreadId; | |
| 159 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); | |
| 160 | |
| 161 Assert.AreNotEqual(pid, p.Join()); | |
| 162 } | |
| 163 | |
| 164 [TestMethod] | |
| 165 public void WorkerPoolSizeTest() { | |
| 81 | 166 var pool = new WorkerPool(5, 10, 1); |
| 77 | 167 |
| 168 Assert.AreEqual(5, pool.PoolSize); | |
| 169 | |
| 170 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
| 171 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
| 172 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
| 173 | |
| 174 Assert.AreEqual(5, pool.PoolSize); | |
| 175 | |
| 176 for (int i = 0; i < 100; i++) | |
| 177 pool.Invoke(() => { Thread.Sleep(100000000); return 10; }); | |
| 178 Thread.Sleep(200); | |
| 179 Assert.AreEqual(10, pool.PoolSize); | |
| 180 | |
| 181 pool.Dispose(); | |
| 182 } | |
| 183 | |
| 184 [TestMethod] | |
| 185 public void WorkerPoolCorrectTest() { | |
| 186 var pool = new WorkerPool(0,1000,100); | |
| 187 | |
| 188 const int iterations = 1000; | |
| 189 int pending = iterations; | |
| 190 var stop = new ManualResetEvent(false); | |
| 191 | |
| 192 var count = 0; | |
| 193 for (int i = 0; i < iterations; i++) { | |
| 194 pool | |
| 195 .Invoke(() => 1) | |
| 196 .Then(x => Interlocked.Add(ref count, x)) | |
| 197 .Then(x => Math.Log10(x)) | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
198 .On(() => { |
| 77 | 199 Interlocked.Decrement(ref pending); |
| 200 if (pending == 0) | |
| 201 stop.Set(); | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
202 }, PromiseEventType.All); |
| 77 | 203 } |
| 204 | |
| 205 stop.WaitOne(); | |
| 206 | |
| 207 Assert.AreEqual(iterations, count); | |
| 208 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); | |
| 209 pool.Dispose(); | |
| 210 | |
| 211 } | |
| 212 | |
| 213 [TestMethod] | |
| 214 public void WorkerPoolDisposeTest() { | |
| 215 var pool = new WorkerPool(5, 20); | |
| 216 Assert.AreEqual(5, pool.PoolSize); | |
| 217 pool.Dispose(); | |
| 218 Thread.Sleep(500); | |
| 219 Assert.AreEqual(0, pool.PoolSize); | |
| 220 pool.Dispose(); | |
| 221 } | |
| 222 | |
| 223 [TestMethod] | |
| 224 public void MTQueueTest() { | |
| 225 var queue = new MTQueue<int>(); | |
| 226 int res; | |
| 227 | |
| 228 queue.Enqueue(10); | |
| 229 Assert.IsTrue(queue.TryDequeue(out res)); | |
| 230 Assert.AreEqual(10, res); | |
| 231 Assert.IsFalse(queue.TryDequeue(out res)); | |
| 232 | |
| 233 for (int i = 0; i < 1000; i++) | |
| 234 queue.Enqueue(i); | |
| 235 | |
| 236 for (int i = 0; i < 1000; i++) { | |
| 237 queue.TryDequeue(out res); | |
| 238 Assert.AreEqual(i, res); | |
| 239 } | |
| 240 | |
| 241 int writers = 0; | |
| 242 int readers = 0; | |
| 243 var stop = new ManualResetEvent(false); | |
| 244 int total = 0; | |
| 245 | |
| 246 const int itemsPerWriter = 10000; | |
| 247 const int writersCount = 10; | |
| 248 | |
| 249 for (int i = 0; i < writersCount; i++) { | |
| 250 Interlocked.Increment(ref writers); | |
| 251 AsyncPool | |
| 124 | 252 .RunThread(() => { |
| 77 | 253 for (int ii = 0; ii < itemsPerWriter; ii++) { |
| 254 queue.Enqueue(1); | |
| 255 } | |
| 256 return 1; | |
| 257 }) | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
258 .On(() => Interlocked.Decrement(ref writers), PromiseEventType.All); |
| 77 | 259 } |
| 260 | |
| 261 for (int i = 0; i < 10; i++) { | |
| 262 Interlocked.Increment(ref readers); | |
| 263 AsyncPool | |
| 124 | 264 .RunThread(() => { |
| 77 | 265 int t; |
| 266 do { | |
| 267 while (queue.TryDequeue(out t)) | |
| 268 Interlocked.Add(ref total, t); | |
| 269 } while (writers > 0); | |
| 270 return 1; | |
| 271 }) | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
272 .On(() => { |
| 77 | 273 Interlocked.Decrement(ref readers); |
| 274 if (readers == 0) | |
| 275 stop.Set(); | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
276 }, PromiseEventType.All); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
277 } |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
278 |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
279 stop.WaitOne(); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
280 |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
281 Assert.AreEqual(100000, total); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
282 } |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
283 |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
284 [TestMethod] |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
285 public void AsyncQueueTest() { |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
286 var queue = new AsyncQueue<int>(); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
287 int res; |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
288 |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
289 queue.Enqueue(10); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
290 Assert.IsTrue(queue.TryDequeue(out res)); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
291 Assert.AreEqual(10, res); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
292 Assert.IsFalse(queue.TryDequeue(out res)); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
293 |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
294 for (int i = 0; i < 1000; i++) |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
295 queue.Enqueue(i); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
296 |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
297 for (int i = 0; i < 1000; i++) { |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
298 queue.TryDequeue(out res); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
299 Assert.AreEqual(i, res); |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
300 } |
|
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
301 |
| 121 | 302 const int count = 10000000; |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
303 |
| 121 | 304 int res1 = 0, res2 = 0; |
| 305 var t1 = Environment.TickCount; | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
306 |
| 121 | 307 AsyncPool.RunThread( |
| 308 () => { | |
| 309 for (var i = 0; i < count; i++) | |
| 310 queue.Enqueue(1); | |
| 311 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
| 312 }, | |
| 313 () => { | |
| 314 for (var i = 0; i < count; i++) | |
| 315 queue.Enqueue(2); | |
| 316 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
| 317 }, | |
| 318 () => { | |
| 319 int temp; | |
| 320 int i = 0; | |
| 321 while (i < count) | |
| 322 if (queue.TryDequeue(out temp)) { | |
| 323 i++; | |
| 324 res1 += temp; | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
325 } |
| 121 | 326 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); |
| 327 }, | |
| 328 () => { | |
| 329 int temp; | |
| 330 int i = 0; | |
| 331 while (i < count) | |
| 332 if (queue.TryDequeue(out temp)) { | |
| 333 i++; | |
| 334 res2 += temp; | |
| 335 } | |
| 336 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
| 337 } | |
| 338 ) | |
| 124 | 339 .Bundle() |
| 121 | 340 .Join(); |
| 341 | |
| 342 Assert.AreEqual(count * 3, res1 + res2); | |
| 343 | |
| 344 Console.WriteLine( | |
| 345 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
| 346 Environment.TickCount - t1, | |
| 347 res1, | |
| 348 res2, | |
| 349 res1 + res2, | |
| 350 count | |
| 351 ); | |
| 352 } | |
| 353 | |
| 354 [TestMethod] | |
| 355 public void AsyncQueueBatchTest() { | |
| 356 var queue = new AsyncQueue<int>(); | |
| 357 | |
| 358 const int wBatch = 29; | |
| 359 const int wCount = 400000; | |
| 360 const int total = wBatch * wCount * 2; | |
| 361 const int summ = wBatch * wCount * 3; | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
362 |
| 121 | 363 int r1 = 0, r2 = 0; |
| 364 const int rBatch = 111; | |
| 365 int read = 0; | |
| 366 | |
| 367 var t1 = Environment.TickCount; | |
| 368 | |
| 369 AsyncPool.RunThread( | |
| 370 () => { | |
| 371 var buffer = new int[wBatch]; | |
| 372 for(int i = 0; i<wBatch; i++) | |
| 373 buffer[i] = 1; | |
| 374 | |
| 375 for(int i =0; i < wCount; i++) | |
| 376 queue.EnqueueRange(buffer,0,wBatch); | |
| 377 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
| 378 }, | |
| 379 () => { | |
| 380 var buffer = new int[wBatch]; | |
| 381 for(int i = 0; i<wBatch; i++) | |
| 382 buffer[i] = 2; | |
| 383 | |
| 384 for(int i =0; i < wCount; i++) | |
| 385 queue.EnqueueRange(buffer,0,wBatch); | |
| 386 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
| 387 }, | |
| 388 () => { | |
| 389 var buffer = new int[rBatch]; | |
| 77 | 390 |
| 121 | 391 while(read < total) { |
| 392 int actual; | |
| 393 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
| 394 for(int i=0; i< actual; i++) | |
| 395 r1 += buffer[i]; | |
| 396 Interlocked.Add(ref read, actual); | |
| 397 } | |
| 398 } | |
| 399 | |
| 400 Console.WriteLine("done reader #1: {0} ms", Environment.TickCount - t1); | |
| 401 }, | |
| 402 () => { | |
| 403 var buffer = new int[rBatch]; | |
| 77 | 404 |
| 121 | 405 while(read < total) { |
| 406 int actual; | |
| 407 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
| 408 for(int i=0; i< actual; i++) | |
| 409 r2 += buffer[i]; | |
| 410 Interlocked.Add(ref read, actual); | |
| 411 } | |
| 412 } | |
| 413 | |
| 414 Console.WriteLine("done reader #2: {0} ms", Environment.TickCount - t1); | |
| 415 } | |
| 416 ) | |
| 124 | 417 .Bundle() |
| 121 | 418 .Join(); |
| 419 | |
| 420 Assert.AreEqual(summ , r1 + r2); | |
| 421 | |
| 422 Console.WriteLine( | |
| 423 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
| 424 Environment.TickCount - t1, | |
| 425 r1, | |
| 426 r2, | |
| 427 r1 + r2, | |
| 428 total | |
| 429 ); | |
| 77 | 430 } |
| 431 | |
| 432 [TestMethod] | |
|
122
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
433 public void AsyncQueueChunkDequeueTest() { |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
434 var queue = new AsyncQueue<int>(); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
435 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
436 const int wBatch = 31; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
437 const int wCount = 200000; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
438 const int total = wBatch * wCount * 3; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
439 const int summ = wBatch * wCount * 6; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
440 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
441 int r1 = 0, r2 = 0; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
442 const int rBatch = 1024; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
443 int read = 0; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
444 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
445 var t1 = Environment.TickCount; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
446 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
447 AsyncPool.RunThread( |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
448 () => { |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
449 var buffer = new int[wBatch]; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
450 for(int i = 0; i<wBatch; i++) |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
451 buffer[i] = 1; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
452 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
453 for(int i =0; i < wCount; i++) |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
454 queue.EnqueueRange(buffer,0,wBatch); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
455 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
456 }, |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
457 () => { |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
458 var buffer = new int[wBatch]; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
459 for(int i = 0; i<wBatch; i++) |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
460 buffer[i] = 2; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
461 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
462 for(int i =0; i < wCount; i++) |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
463 queue.EnqueueRange(buffer,0,wBatch); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
464 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
465 }, |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
466 () => { |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
467 var buffer = new int[wBatch]; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
468 for(int i = 0; i<wBatch; i++) |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
469 buffer[i] = 3; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
470 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
471 for(int i =0; i < wCount; i++) |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
472 queue.EnqueueRange(buffer,0,wBatch); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
473 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
474 }, |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
475 () => { |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
476 var buffer = new int[rBatch]; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
477 int count = 1; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
478 double avgchunk = 0; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
479 while(read < total) { |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
480 int actual; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
481 if (queue.TryDequeueChunk(buffer,0,rBatch,out actual)) { |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
482 for(int i=0; i< actual; i++) |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
483 r2 += buffer[i]; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
484 Interlocked.Add(ref read, actual); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
485 avgchunk = avgchunk*(count-1)/count + actual/(double)count; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
486 count ++; |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
487 } |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
488 } |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
489 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
490 Console.WriteLine("done reader #2: {0} ms, avg chunk size: {1}", Environment.TickCount - t1, avgchunk); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
491 } |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
492 ) |
| 124 | 493 .Bundle() |
| 494 .Join(); | |
| 495 | |
| 496 Assert.AreEqual(summ , r1 + r2); | |
| 497 | |
| 498 Console.WriteLine( | |
| 499 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", | |
| 500 Environment.TickCount - t1, | |
| 501 r1, | |
| 502 r2, | |
| 503 r1 + r2, | |
| 504 total | |
| 505 ); | |
| 506 } | |
| 507 | |
| 508 [TestMethod] | |
| 509 public void AsyncQueueDrainTest() { | |
| 510 var queue = new AsyncQueue<int>(); | |
| 511 | |
| 512 const int wBatch = 11; | |
| 513 const int wCount = 200000; | |
| 514 const int total = wBatch * wCount * 3; | |
| 515 const int summ = wBatch * wCount * 3; | |
| 516 | |
| 517 int r1 = 0, r2 = 0; | |
| 518 const int rBatch = 11; | |
| 519 int read = 0; | |
| 520 | |
| 521 var t1 = Environment.TickCount; | |
| 522 | |
| 523 AsyncPool.RunThread( | |
| 524 () => { | |
| 525 var buffer = new int[wBatch]; | |
| 526 for(int i = 0; i<wBatch; i++) | |
| 527 buffer[i] = 1; | |
| 528 | |
| 529 for(int i =0; i < wCount; i++) | |
| 530 queue.EnqueueRange(buffer,0,wBatch); | |
| 531 Console.WriteLine("done writer #1: {0} ms", Environment.TickCount - t1); | |
| 532 }, | |
| 533 () => { | |
| 534 for(int i =0; i < wCount * wBatch; i++) | |
| 535 queue.Enqueue(1); | |
| 536 Console.WriteLine("done writer #2: {0} ms", Environment.TickCount - t1); | |
| 537 }, | |
| 538 () => { | |
| 539 var buffer = new int[wBatch]; | |
| 540 for(int i = 0; i<wBatch; i++) | |
| 541 buffer[i] = 1; | |
| 542 | |
| 543 for(int i =0; i < wCount; i++) | |
| 544 queue.EnqueueRange(buffer,0,wBatch); | |
| 545 Console.WriteLine("done writer #3: {0} ms", Environment.TickCount - t1); | |
| 546 }, | |
| 547 /*() => { | |
| 548 int temp; | |
| 549 int count = 0; | |
| 550 while (read < total) | |
| 551 if (queue.TryDequeue(out temp)) { | |
| 552 count++; | |
| 553 r1 += temp; | |
| 554 Interlocked.Increment(ref read); | |
| 555 } | |
| 556 Console.WriteLine("done reader #1: {0} ms, {1} count", Environment.TickCount - t1, count); | |
| 557 },*/ | |
| 558 /*() => { | |
| 559 var buffer = new int[rBatch]; | |
| 560 var count = 0; | |
| 561 while(read < total) { | |
| 562 int actual; | |
| 563 if (queue.TryDequeueRange(buffer,0,rBatch,out actual)) { | |
| 564 for(int i=0; i< actual; i++) | |
| 565 r1 += buffer[i]; | |
| 566 Interlocked.Add(ref read, actual); | |
| 567 count += actual; | |
| 568 } | |
| 569 } | |
| 570 | |
| 571 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
| 572 },*/ | |
| 573 () => { | |
| 574 var count = 0; | |
| 575 while(read < total) { | |
| 576 var buffer = queue.Drain(); | |
| 577 for(int i=0; i< buffer.Length; i++) | |
| 578 r1 += buffer[i]; | |
| 579 Interlocked.Add(ref read, buffer.Length); | |
| 580 count += buffer.Length; | |
| 581 } | |
| 582 Console.WriteLine("done reader #1: {0} ms, {1} items", Environment.TickCount - t1, count); | |
| 583 }, | |
| 584 () => { | |
| 585 var count = 0; | |
| 586 while(read < total) { | |
| 587 var buffer = queue.Drain(); | |
| 588 for(int i=0; i< buffer.Length; i++) | |
| 589 r2 += buffer[i]; | |
| 590 Interlocked.Add(ref read, buffer.Length); | |
| 591 count += buffer.Length; | |
| 592 } | |
| 593 Console.WriteLine("done reader #2: {0} ms, {1} items", Environment.TickCount - t1, count); | |
| 594 } | |
| 595 ) | |
| 596 .Bundle() | |
|
122
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
597 .Join(); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
598 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
599 Assert.AreEqual(summ , r1 + r2); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
600 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
601 Console.WriteLine( |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
602 "done: {0} ms, summ#1: {1}, summ#2: {2}, total: {3}, count: {4}", |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
603 Environment.TickCount - t1, |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
604 r1, |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
605 r2, |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
606 r1 + r2, |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
607 total |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
608 ); |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
609 } |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
610 |
|
0c8685c8b56b
minor fixes and improvements of AsyncQueue, additional tests
cin
parents:
121
diff
changeset
|
611 [TestMethod] |
| 77 | 612 public void ParallelMapTest() { |
| 613 | |
| 614 const int count = 100000; | |
| 615 | |
| 616 var args = new double[count]; | |
| 617 var rand = new Random(); | |
| 618 | |
| 619 for (int i = 0; i < count; i++) | |
| 620 args[i] = rand.NextDouble(); | |
| 621 | |
| 622 var t = Environment.TickCount; | |
| 623 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); | |
| 624 | |
| 625 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
| 626 | |
| 627 t = Environment.TickCount; | |
| 628 for (int i = 0; i < count; i++) | |
| 629 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
| 630 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
| 631 } | |
| 632 | |
| 633 [TestMethod] | |
| 634 public void ChainedMapTest() { | |
| 635 | |
| 125 | 636 using (var pool = new WorkerPool()) { |
| 77 | 637 const int count = 10000; |
| 638 | |
| 639 var args = new double[count]; | |
| 640 var rand = new Random(); | |
| 641 | |
| 642 for (int i = 0; i < count; i++) | |
| 643 args[i] = rand.NextDouble(); | |
| 644 | |
| 645 var t = Environment.TickCount; | |
| 646 var res = args | |
| 647 .ChainedMap( | |
| 648 // Analysis disable once AccessToDisposedClosure | |
| 649 x => pool.Invoke( | |
| 650 () => Math.Sin(x * x) | |
| 651 ), | |
| 652 4 | |
| 653 ) | |
| 654 .Join(); | |
| 655 | |
| 656 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
| 657 | |
| 658 t = Environment.TickCount; | |
| 659 for (int i = 0; i < count; i++) | |
| 660 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
| 661 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
| 662 Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); | |
| 663 } | |
| 664 } | |
| 665 | |
| 666 [TestMethod] | |
| 667 public void ParallelForEachTest() { | |
| 668 | |
| 669 const int count = 100000; | |
| 670 | |
| 671 var args = new int[count]; | |
| 672 var rand = new Random(); | |
| 673 | |
| 674 for (int i = 0; i < count; i++) | |
| 675 args[i] = (int)(rand.NextDouble() * 100); | |
| 676 | |
| 677 int result = 0; | |
| 678 | |
| 679 var t = Environment.TickCount; | |
| 680 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); | |
| 681 | |
| 682 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); | |
| 683 | |
| 684 int result2 = 0; | |
| 685 | |
| 686 t = Environment.TickCount; | |
| 687 for (int i = 0; i < count; i++) | |
| 688 result2 += args[i]; | |
| 689 Assert.AreEqual(result2, result); | |
| 690 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
| 691 } | |
| 692 | |
| 693 [TestMethod] | |
| 694 public void ComplexCase1Test() { | |
| 695 var flags = new bool[3]; | |
| 696 | |
| 697 // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) | |
| 698 | |
| 699 var step1 = PromiseHelper | |
| 700 .Sleep(200, "Alan") | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
701 .On(() => flags[0] = true, PromiseEventType.Cancelled); |
| 77 | 702 var p = step1 |
| 703 .Chain(x => | |
| 704 PromiseHelper | |
| 705 .Sleep(200, "Hi, " + x) | |
| 706 .Then(y => y) | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
707 .On(() => flags[1] = true, PromiseEventType.Cancelled) |
| 77 | 708 ) |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
106
diff
changeset
|
709 .On(() => flags[2] = true, PromiseEventType.Cancelled); |
| 77 | 710 step1.Join(); |
| 711 p.Cancel(); | |
| 712 try { | |
| 713 Assert.AreEqual(p.Join(), "Hi, Alan"); | |
| 714 Assert.Fail("Shouldn't get here"); | |
| 715 } catch (OperationCanceledException) { | |
| 716 } | |
| 717 | |
| 718 Assert.IsFalse(flags[0]); | |
| 719 Assert.IsTrue(flags[1]); | |
| 720 Assert.IsTrue(flags[2]); | |
| 721 } | |
| 722 | |
| 723 [TestMethod] | |
| 724 public void ChainedCancel1Test() { | |
| 725 // при отмене сцепленной асинхронной операции все обещание должно | |
| 726 // завершаться ошибкой OperationCanceledException | |
| 727 var p = PromiseHelper | |
| 728 .Sleep(1, "Hi, HAL!") | |
| 729 .Then(x => { | |
| 730 // запускаем две асинхронные операции | |
| 731 var result = PromiseHelper.Sleep(1000, "HEM ENABLED!!!"); | |
| 732 // вторая операция отменяет первую до завершения | |
| 733 PromiseHelper | |
| 734 .Sleep(100, "HAL, STOP!") | |
| 735 .Then(result.Cancel); | |
| 736 return result; | |
| 737 }); | |
| 738 try { | |
| 739 p.Join(); | |
| 740 } catch (TargetInvocationException err) { | |
| 741 Assert.IsTrue(err.InnerException is OperationCanceledException); | |
| 742 } | |
| 743 } | |
| 744 | |
| 745 [TestMethod] | |
| 746 public void ChainedCancel2Test() { | |
| 747 // при отмене цепочки обещаний, вложенные операции также должны отменяться | |
| 748 var pSurvive = new Promise<bool>(); | |
| 149 | 749 var hemStarted = new Signal(); |
| 77 | 750 var p = PromiseHelper |
| 751 .Sleep(1, "Hi, HAL!") | |
| 149 | 752 .Chain(() => { |
| 77 | 753 hemStarted.Set(); |
| 754 // запускаем две асинхронные операции | |
| 755 var result = PromiseHelper | |
| 149 | 756 .Sleep(2000, "HEM ENABLED!!!") |
| 757 .Then(() => pSurvive.Resolve(false)); | |
| 77 | 758 |
| 759 result | |
|
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
136
diff
changeset
|
760 .On(() => pSurvive.Resolve(true), PromiseEventType.Cancelled); |
| 149 | 761 |
| 77 | 762 return result; |
| 763 }); | |
| 764 | |
| 149 | 765 hemStarted.Wait(); |
| 77 | 766 p.Cancel(); |
| 767 | |
| 768 try { | |
| 769 p.Join(); | |
| 149 | 770 Assert.Fail(); |
| 77 | 771 } catch (OperationCanceledException) { |
| 772 } | |
| 149 | 773 Assert.IsTrue(pSurvive.Join()); |
| 77 | 774 } |
| 136 | 775 |
| 776 [TestMethod] | |
| 777 public void SharedLockTest() { | |
| 778 var l = new SharedLock(); | |
| 779 int shared = 0; | |
| 780 int exclusive = 0; | |
| 781 var s1 = new Signal(); | |
| 782 var log = new AsyncQueue<string>(); | |
| 783 | |
| 784 try { | |
| 785 AsyncPool.RunThread( | |
| 786 () => { | |
| 787 log.Enqueue("Reader #1 started"); | |
| 788 try { | |
| 789 l.LockShared(); | |
| 790 log.Enqueue("Reader #1 lock got"); | |
| 791 if (Interlocked.Increment(ref shared) == 2) | |
| 792 s1.Set(); | |
| 793 s1.Wait(); | |
| 794 log.Enqueue("Reader #1 finished"); | |
| 795 Interlocked.Decrement(ref shared); | |
| 796 } finally { | |
| 797 l.Release(); | |
| 798 log.Enqueue("Reader #1 lock released"); | |
| 799 } | |
| 800 }, | |
| 801 () => { | |
| 802 log.Enqueue("Reader #2 started"); | |
| 803 | |
| 804 try { | |
| 805 l.LockShared(); | |
| 806 log.Enqueue("Reader #2 lock got"); | |
| 807 | |
| 808 if (Interlocked.Increment(ref shared) == 2) | |
| 809 s1.Set(); | |
| 810 s1.Wait(); | |
| 811 log.Enqueue("Reader #2 upgrading to writer"); | |
| 812 Interlocked.Decrement(ref shared); | |
| 813 l.Upgrade(); | |
| 814 log.Enqueue("Reader #2 upgraded"); | |
| 815 | |
| 816 Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |
| 817 Assert.AreEqual(0, shared); | |
| 818 log.Enqueue("Reader #2 finished"); | |
| 819 Interlocked.Decrement(ref exclusive); | |
| 820 } finally { | |
| 821 l.Release(); | |
| 822 log.Enqueue("Reader #2 lock released"); | |
| 823 } | |
| 824 }, | |
| 825 () => { | |
| 826 log.Enqueue("Writer #1 started"); | |
| 827 try { | |
| 828 l.LockExclusive(); | |
| 829 log.Enqueue("Writer #1 got the lock"); | |
| 830 Assert.AreEqual(1, Interlocked.Increment(ref exclusive)); | |
| 831 Interlocked.Decrement(ref exclusive); | |
| 832 log.Enqueue("Writer #1 is finished"); | |
| 833 } finally { | |
| 834 l.Release(); | |
| 835 log.Enqueue("Writer #1 lock released"); | |
| 836 } | |
| 837 } | |
| 838 ).Bundle().Join(1000); | |
| 839 log.Enqueue("Done"); | |
| 840 } catch(Exception error) { | |
| 841 log.Enqueue(error.Message); | |
| 842 throw; | |
| 843 } finally { | |
| 844 foreach (var m in log) | |
| 845 Console.WriteLine(m); | |
| 846 } | |
| 847 } | |
| 151 | 848 |
| 849 #if NET_4_5 | |
| 850 | |
| 851 [TestMethod] | |
| 852 public async void TaskInteropTest() { | |
| 853 var promise = new Promise<int>(); | |
| 854 promise.Resolve(10); | |
| 855 var res = await promise; | |
| 856 | |
| 857 Assert.AreEqual(10, res); | |
| 858 } | |
| 859 | |
| 860 #endif | |
| 77 | 861 } |
| 862 } | |
| 863 |
