Mercurial > pub > ImplabNet
comparison Implab.Test/AsyncTests.cs @ 15:0f982f9b7d4d promises
implemented parallel map and foreach for arrays
rewritten WorkerPool with MTQueue for more efficiency
author | cin |
---|---|
date | Thu, 07 Nov 2013 03:41:32 +0400 |
parents | e943453e5039 |
children | 5a4b735ba669 |
comparison
equal
deleted
inserted
replaced
14:e943453e5039 | 15:0f982f9b7d4d |
---|---|
2 using Microsoft.VisualStudio.TestTools.UnitTesting; | 2 using Microsoft.VisualStudio.TestTools.UnitTesting; |
3 using System.Reflection; | 3 using System.Reflection; |
4 using System.Threading; | 4 using System.Threading; |
5 using Implab.Parallels; | 5 using Implab.Parallels; |
6 | 6 |
7 namespace Implab.Test | 7 namespace Implab.Test { |
8 { | 8 [TestClass] |
9 [TestClass] | 9 public class AsyncTests { |
10 public class AsyncTests | 10 [TestMethod] |
11 { | 11 public void ResolveTest() { |
12 [TestMethod] | 12 int res = -1; |
13 public void ResolveTest () | 13 var p = new Promise<int>(); |
14 { | 14 p.Then(x => res = x); |
15 int res = -1; | 15 p.Resolve(100); |
16 var p = new Promise<int> (); | 16 |
17 p.Then (x => res = x); | 17 Assert.AreEqual(res, 100); |
18 p.Resolve (100); | 18 } |
19 | 19 |
20 Assert.AreEqual (res, 100); | 20 [TestMethod] |
21 } | 21 public void RejectTest() { |
22 | 22 int res = -1; |
23 [TestMethod] | 23 Exception err = null; |
24 public void RejectTest () | 24 |
25 { | 25 var p = new Promise<int>(); |
26 int res = -1; | 26 p.Then(x => res = x, e => err = e); |
27 Exception err = null; | 27 p.Reject(new ApplicationException("error")); |
28 | 28 |
29 var p = new Promise<int> (); | 29 Assert.AreEqual(res, -1); |
30 p.Then (x => res = x, e => err = e); | 30 Assert.AreEqual(err.Message, "error"); |
31 p.Reject (new ApplicationException ("error")); | 31 |
32 | 32 } |
33 Assert.AreEqual (res, -1); | 33 |
34 Assert.AreEqual (err.Message, "error"); | 34 [TestMethod] |
35 | 35 public void JoinSuccessTest() { |
36 } | 36 var p = new Promise<int>(); |
37 | 37 p.Resolve(100); |
38 [TestMethod] | 38 Assert.AreEqual(p.Join(), 100); |
39 public void JoinSuccessTest () | 39 } |
40 { | 40 |
41 var p = new Promise<int> (); | 41 [TestMethod] |
42 p.Resolve (100); | 42 public void JoinFailTest() { |
43 Assert.AreEqual (p.Join (), 100); | 43 var p = new Promise<int>(); |
44 } | 44 p.Reject(new ApplicationException("failed")); |
45 | 45 |
46 [TestMethod] | 46 try { |
47 public void JoinFailTest () | 47 p.Join(); |
48 { | 48 throw new ApplicationException("WRONG!"); |
49 var p = new Promise<int> (); | 49 } catch (TargetInvocationException err) { |
50 p.Reject (new ApplicationException ("failed")); | 50 Assert.AreEqual(err.InnerException.Message, "failed"); |
51 | 51 } catch { |
52 try { | 52 Assert.Fail("Got wrong excaption"); |
53 p.Join (); | 53 } |
54 throw new ApplicationException ("WRONG!"); | 54 } |
55 } catch (TargetInvocationException err) { | 55 |
56 Assert.AreEqual (err.InnerException.Message, "failed"); | 56 [TestMethod] |
57 } catch { | 57 public void MapTest() { |
58 Assert.Fail ("Got wrong excaption"); | 58 var p = new Promise<int>(); |
59 } | 59 |
60 } | 60 var p2 = p.Map(x => x.ToString()); |
61 | 61 p.Resolve(100); |
62 [TestMethod] | 62 |
63 public void MapTest () | 63 Assert.AreEqual(p2.Join(), "100"); |
64 { | 64 } |
65 var p = new Promise<int> (); | |
66 | |
67 var p2 = p.Map (x => x.ToString ()); | |
68 p.Resolve (100); | |
69 | |
70 Assert.AreEqual (p2.Join (), "100"); | |
71 } | |
72 | 65 |
73 [TestMethod] | 66 [TestMethod] |
74 public void FixErrorTest() { | 67 public void FixErrorTest() { |
75 var p = new Promise<int>(); | 68 var p = new Promise<int>(); |
76 | 69 |
80 | 73 |
81 Assert.AreEqual(p2.Join(), 101); | 74 Assert.AreEqual(p2.Join(), 101); |
82 } | 75 } |
83 | 76 |
84 [TestMethod] | 77 [TestMethod] |
85 public void ChainTest () | 78 public void ChainTest() { |
86 { | 79 var p1 = new Promise<int>(); |
87 var p1 = new Promise<int> (); | 80 |
88 | 81 var p3 = p1.Chain(x => { |
89 var p3 = p1.Chain (x => { | 82 var p2 = new Promise<string>(); |
90 var p2 = new Promise<string> (); | 83 p2.Resolve(x.ToString()); |
91 p2.Resolve (x.ToString ()); | 84 return p2; |
92 return p2; | 85 }); |
93 }); | 86 |
94 | 87 p1.Resolve(100); |
95 p1.Resolve (100); | 88 |
96 | 89 Assert.AreEqual(p3.Join(), "100"); |
97 Assert.AreEqual (p3.Join (), "100"); | 90 } |
98 } | 91 |
99 | 92 [TestMethod] |
100 [TestMethod] | 93 public void PoolTest() { |
101 public void PoolTest () | 94 var pid = Thread.CurrentThread.ManagedThreadId; |
102 { | 95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); |
103 var pid = Thread.CurrentThread.ManagedThreadId; | 96 |
104 var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId); | 97 Assert.AreNotEqual(pid, p.Join()); |
105 | 98 } |
106 Assert.AreNotEqual (pid, p.Join ()); | |
107 } | |
108 | 99 |
109 [TestMethod] | 100 [TestMethod] |
110 public void WorkerPoolSizeTest() { | 101 public void WorkerPoolSizeTest() { |
111 var pool = new WorkerPool(5,10); | 102 var pool = new WorkerPool(5, 10); |
112 | 103 |
113 Assert.AreEqual(5, pool.ThreadCount); | 104 Assert.AreEqual(5, pool.ThreadCount); |
114 | 105 |
115 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | 106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); |
116 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | 107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); |
117 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | 108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); |
118 | 109 |
119 Assert.AreEqual(5, pool.ThreadCount); | 110 Assert.AreEqual(5, pool.ThreadCount); |
120 | 111 |
121 for (int i = 0; i < 100; i++) | 112 for (int i = 0; i < 100; i++) |
122 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); | 113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); |
114 Thread.Sleep(100); | |
123 Assert.AreEqual(10, pool.ThreadCount); | 115 Assert.AreEqual(10, pool.ThreadCount); |
116 | |
117 pool.Dispose(); | |
124 } | 118 } |
125 | 119 |
126 [TestMethod] | 120 [TestMethod] |
127 public void WorkerPoolCorrectTest() { | 121 public void WorkerPoolCorrectTest() { |
128 var pool = new WorkerPool(5, 20); | 122 var pool = new WorkerPool(); |
123 | |
124 int iterations = 1000; | |
125 int pending = iterations; | |
126 var stop = new ManualResetEvent(false); | |
129 | 127 |
130 var count = 0; | 128 var count = 0; |
131 for (int i = 0; i < 1000; i++) | 129 for (int i = 0; i < iterations; i++) { |
132 pool | 130 pool |
133 .Invoke(() => 1) | 131 .Invoke(() => 1) |
134 .Then(x => Interlocked.Add(ref count, x)); | 132 .Then(x => Interlocked.Add(ref count, x)) |
135 | 133 .Then(x => Math.Log10(x)) |
136 Assert.AreEqual(1000, count); | 134 .Anyway(() => { |
135 Interlocked.Decrement(ref pending); | |
136 if (pending == 0) | |
137 stop.Set(); | |
138 }); | |
139 } | |
140 | |
141 stop.WaitOne(); | |
142 | |
143 Assert.AreEqual(iterations, count); | |
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); | |
145 pool.Dispose(); | |
146 | |
147 } | |
148 | |
149 [TestMethod] | |
150 public void WorkerPoolDisposeTest() { | |
151 var pool = new WorkerPool(5, 20); | |
152 Assert.AreEqual(5, pool.ThreadCount); | |
153 pool.Dispose(); | |
154 Thread.Sleep(100); | |
155 Assert.AreEqual(0, pool.ThreadCount); | |
156 pool.Dispose(); | |
137 } | 157 } |
138 | 158 |
139 [TestMethod] | 159 [TestMethod] |
140 public void MTQueueTest() { | 160 public void MTQueueTest() { |
141 var queue = new MTQueue<int>(); | 161 var queue = new MTQueue<int>(); |
142 var pool = new WorkerPool(5, 20); | |
143 | |
144 int res; | 162 int res; |
145 | 163 |
146 queue.Enqueue(10); | 164 queue.Enqueue(10); |
147 Assert.IsTrue(queue.TryDequeue(out res)); | 165 Assert.IsTrue(queue.TryDequeue(out res)); |
148 Assert.AreEqual(10, res); | 166 Assert.AreEqual(10, res); |
167 for (int i = 0; i < writersCount; i++) { | 185 for (int i = 0; i < writersCount; i++) { |
168 Interlocked.Increment(ref writers); | 186 Interlocked.Increment(ref writers); |
169 var wn = i; | 187 var wn = i; |
170 AsyncPool | 188 AsyncPool |
171 .InvokeNewThread(() => { | 189 .InvokeNewThread(() => { |
172 Console.WriteLine("Started writer: {0}", wn); | |
173 for (int ii = 0; ii < itemsPerWriter; ii++) { | 190 for (int ii = 0; ii < itemsPerWriter; ii++) { |
174 queue.Enqueue(1); | 191 queue.Enqueue(1); |
175 Thread.Sleep(1); | |
176 } | 192 } |
177 Console.WriteLine("Stopped writer: {0}", wn); | |
178 return 1; | 193 return 1; |
179 }) | 194 }) |
180 .Then(x => Interlocked.Decrement(ref writers) ); | 195 .Anyway(() => Interlocked.Decrement(ref writers)); |
181 } | 196 } |
182 | 197 |
183 for (int i = 0; i < 10; i++) { | 198 for (int i = 0; i < 10; i++) { |
184 Interlocked.Increment(ref readers); | 199 Interlocked.Increment(ref readers); |
185 var wn = i; | 200 var wn = i; |
186 AsyncPool | 201 AsyncPool |
187 .InvokeNewThread(() => { | 202 .InvokeNewThread(() => { |
188 int t; | 203 int t; |
189 Console.WriteLine("Started reader: {0}", wn); | |
190 do { | 204 do { |
191 while (queue.TryDequeue(out t)) | 205 while (queue.TryDequeue(out t)) |
192 Interlocked.Add(ref total, t); | 206 Interlocked.Add(ref total, t); |
193 Thread.Sleep(0); | |
194 } while (writers > 0); | 207 } while (writers > 0); |
195 Console.WriteLine("Stopped reader: {0}", wn); | |
196 return 1; | 208 return 1; |
197 }) | 209 }) |
198 .Then(x => { | 210 .Anyway(() => { |
199 Interlocked.Decrement(ref readers); | 211 Interlocked.Decrement(ref readers); |
200 if (readers == 0) | 212 if (readers == 0) |
201 stop.Set(); | 213 stop.Set(); |
202 }); | 214 }); |
203 } | 215 } |
204 | 216 |
205 stop.WaitOne(); | 217 stop.WaitOne(); |
206 | 218 |
207 Assert.AreEqual(itemsPerWriter * writersCount, total); | 219 Assert.AreEqual(itemsPerWriter * writersCount, total); |
220 } | |
221 | |
222 [TestMethod] | |
223 public void ParallelMapTest() { | |
224 | |
225 int count = 100000; | |
226 | |
227 double[] args = new double[count]; | |
228 var rand = new Random(); | |
229 | |
230 for (int i = 0; i < count; i++) | |
231 args[i] = rand.NextDouble(); | |
232 | |
233 var t = Environment.TickCount; | |
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); | |
235 | |
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); | |
237 | |
238 t = Environment.TickCount; | |
239 for (int i = 0; i < count; i++) | |
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); | |
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
242 } | |
243 | |
244 [TestMethod] | |
245 public void ParallelForEachTest() { | |
246 | |
247 int count = 100000; | |
248 | |
249 int[] args = new int[count]; | |
250 var rand = new Random(); | |
251 | |
252 for (int i = 0; i < count; i++) | |
253 args[i] = (int)(rand.NextDouble() * 100); | |
254 | |
255 int result = 0; | |
256 | |
257 var t = Environment.TickCount; | |
258 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); | |
259 | |
260 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); | |
261 | |
262 int result2 = 0; | |
263 | |
264 t = Environment.TickCount; | |
265 for (int i = 0; i < count; i++) | |
266 result2 += args[i]; | |
267 Assert.AreEqual(result2, result); | |
268 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); | |
208 } | 269 } |
209 | 270 |
210 [TestMethod] | 271 [TestMethod] |
211 public void ComplexCase1Test() { | 272 public void ComplexCase1Test() { |
212 var flags = new bool[3]; | 273 var flags = new bool[3]; |
217 .Sleep(200, "Alan") | 278 .Sleep(200, "Alan") |
218 .Cancelled(() => flags[0] = true) | 279 .Cancelled(() => flags[0] = true) |
219 .Chain(x => | 280 .Chain(x => |
220 PromiseHelper | 281 PromiseHelper |
221 .Sleep(200, "Hi, " + x) | 282 .Sleep(200, "Hi, " + x) |
222 .Map( y => y ) | 283 .Map(y => y) |
223 .Cancelled(() => flags[1] = true) | 284 .Cancelled(() => flags[1] = true) |
224 ) | 285 ) |
225 .Cancelled(() => flags[2] = true); | 286 .Cancelled(() => flags[2] = true); |
226 Thread.Sleep(300); | 287 Thread.Sleep(300); |
227 p.Cancel(); | 288 p.Cancel(); |
228 try { | 289 try { |
229 Assert.AreEqual(p.Join(), "Hi, Alan"); | 290 Assert.AreEqual(p.Join(), "Hi, Alan"); |
230 Assert.Fail("Shouldn't get here"); | 291 Assert.Fail("Shouldn't get here"); |
231 } catch(OperationCanceledException) { | 292 } catch (OperationCanceledException) { |
232 } | 293 } |
233 | 294 |
234 Assert.IsFalse(flags[0]); | 295 Assert.IsFalse(flags[0]); |
235 Assert.IsTrue(flags[1]); | 296 Assert.IsTrue(flags[1]); |
236 Assert.IsTrue(flags[2]); | 297 Assert.IsTrue(flags[2]); |
237 } | 298 } |
238 } | 299 } |
239 } | 300 } |
240 | 301 |