comparison Implab.Test/AsyncTests.cs @ 15:0f982f9b7d4d promises

implemented parallel map and foreach for arrays rewritten WorkerPool with MTQueue for more efficiency
author cin
date Thu, 07 Nov 2013 03:41:32 +0400
parents e943453e5039
children 5a4b735ba669
comparison
equal deleted inserted replaced
14:e943453e5039 15:0f982f9b7d4d
2 using Microsoft.VisualStudio.TestTools.UnitTesting; 2 using Microsoft.VisualStudio.TestTools.UnitTesting;
3 using System.Reflection; 3 using System.Reflection;
4 using System.Threading; 4 using System.Threading;
5 using Implab.Parallels; 5 using Implab.Parallels;
6 6
7 namespace Implab.Test 7 namespace Implab.Test {
8 { 8 [TestClass]
9 [TestClass] 9 public class AsyncTests {
10 public class AsyncTests 10 [TestMethod]
11 { 11 public void ResolveTest() {
12 [TestMethod] 12 int res = -1;
13 public void ResolveTest () 13 var p = new Promise<int>();
14 { 14 p.Then(x => res = x);
15 int res = -1; 15 p.Resolve(100);
16 var p = new Promise<int> (); 16
17 p.Then (x => res = x); 17 Assert.AreEqual(res, 100);
18 p.Resolve (100); 18 }
19 19
20 Assert.AreEqual (res, 100); 20 [TestMethod]
21 } 21 public void RejectTest() {
22 22 int res = -1;
23 [TestMethod] 23 Exception err = null;
24 public void RejectTest () 24
25 { 25 var p = new Promise<int>();
26 int res = -1; 26 p.Then(x => res = x, e => err = e);
27 Exception err = null; 27 p.Reject(new ApplicationException("error"));
28 28
29 var p = new Promise<int> (); 29 Assert.AreEqual(res, -1);
30 p.Then (x => res = x, e => err = e); 30 Assert.AreEqual(err.Message, "error");
31 p.Reject (new ApplicationException ("error")); 31
32 32 }
33 Assert.AreEqual (res, -1); 33
34 Assert.AreEqual (err.Message, "error"); 34 [TestMethod]
35 35 public void JoinSuccessTest() {
36 } 36 var p = new Promise<int>();
37 37 p.Resolve(100);
38 [TestMethod] 38 Assert.AreEqual(p.Join(), 100);
39 public void JoinSuccessTest () 39 }
40 { 40
41 var p = new Promise<int> (); 41 [TestMethod]
42 p.Resolve (100); 42 public void JoinFailTest() {
43 Assert.AreEqual (p.Join (), 100); 43 var p = new Promise<int>();
44 } 44 p.Reject(new ApplicationException("failed"));
45 45
46 [TestMethod] 46 try {
47 public void JoinFailTest () 47 p.Join();
48 { 48 throw new ApplicationException("WRONG!");
49 var p = new Promise<int> (); 49 } catch (TargetInvocationException err) {
50 p.Reject (new ApplicationException ("failed")); 50 Assert.AreEqual(err.InnerException.Message, "failed");
51 51 } catch {
52 try { 52 Assert.Fail("Got wrong excaption");
53 p.Join (); 53 }
54 throw new ApplicationException ("WRONG!"); 54 }
55 } catch (TargetInvocationException err) { 55
56 Assert.AreEqual (err.InnerException.Message, "failed"); 56 [TestMethod]
57 } catch { 57 public void MapTest() {
58 Assert.Fail ("Got wrong excaption"); 58 var p = new Promise<int>();
59 } 59
60 } 60 var p2 = p.Map(x => x.ToString());
61 61 p.Resolve(100);
62 [TestMethod] 62
63 public void MapTest () 63 Assert.AreEqual(p2.Join(), "100");
64 { 64 }
65 var p = new Promise<int> ();
66
67 var p2 = p.Map (x => x.ToString ());
68 p.Resolve (100);
69
70 Assert.AreEqual (p2.Join (), "100");
71 }
72 65
73 [TestMethod] 66 [TestMethod]
74 public void FixErrorTest() { 67 public void FixErrorTest() {
75 var p = new Promise<int>(); 68 var p = new Promise<int>();
76 69
80 73
81 Assert.AreEqual(p2.Join(), 101); 74 Assert.AreEqual(p2.Join(), 101);
82 } 75 }
83 76
84 [TestMethod] 77 [TestMethod]
85 public void ChainTest () 78 public void ChainTest() {
86 { 79 var p1 = new Promise<int>();
87 var p1 = new Promise<int> (); 80
88 81 var p3 = p1.Chain(x => {
89 var p3 = p1.Chain (x => { 82 var p2 = new Promise<string>();
90 var p2 = new Promise<string> (); 83 p2.Resolve(x.ToString());
91 p2.Resolve (x.ToString ()); 84 return p2;
92 return p2; 85 });
93 }); 86
94 87 p1.Resolve(100);
95 p1.Resolve (100); 88
96 89 Assert.AreEqual(p3.Join(), "100");
97 Assert.AreEqual (p3.Join (), "100"); 90 }
98 } 91
99 92 [TestMethod]
100 [TestMethod] 93 public void PoolTest() {
101 public void PoolTest () 94 var pid = Thread.CurrentThread.ManagedThreadId;
102 { 95 var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId);
103 var pid = Thread.CurrentThread.ManagedThreadId; 96
104 var p = AsyncPool.Invoke (() => Thread.CurrentThread.ManagedThreadId); 97 Assert.AreNotEqual(pid, p.Join());
105 98 }
106 Assert.AreNotEqual (pid, p.Join ());
107 }
108 99
109 [TestMethod] 100 [TestMethod]
110 public void WorkerPoolSizeTest() { 101 public void WorkerPoolSizeTest() {
111 var pool = new WorkerPool(5,10); 102 var pool = new WorkerPool(5, 10);
112 103
113 Assert.AreEqual(5, pool.ThreadCount); 104 Assert.AreEqual(5, pool.ThreadCount);
114 105
115 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); 106 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
116 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); 107 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
117 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); 108 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
118 109
119 Assert.AreEqual(5, pool.ThreadCount); 110 Assert.AreEqual(5, pool.ThreadCount);
120 111
121 for (int i = 0; i < 100; i++) 112 for (int i = 0; i < 100; i++)
122 pool.Invoke(() => { Thread.Sleep(1000); return 10; }); 113 pool.Invoke(() => { Thread.Sleep(1000000); return 10; });
114 Thread.Sleep(100);
123 Assert.AreEqual(10, pool.ThreadCount); 115 Assert.AreEqual(10, pool.ThreadCount);
116
117 pool.Dispose();
124 } 118 }
125 119
126 [TestMethod] 120 [TestMethod]
127 public void WorkerPoolCorrectTest() { 121 public void WorkerPoolCorrectTest() {
128 var pool = new WorkerPool(5, 20); 122 var pool = new WorkerPool();
123
124 int iterations = 1000;
125 int pending = iterations;
126 var stop = new ManualResetEvent(false);
129 127
130 var count = 0; 128 var count = 0;
131 for (int i = 0; i < 1000; i++) 129 for (int i = 0; i < iterations; i++) {
132 pool 130 pool
133 .Invoke(() => 1) 131 .Invoke(() => 1)
134 .Then(x => Interlocked.Add(ref count, x)); 132 .Then(x => Interlocked.Add(ref count, x))
135 133 .Then(x => Math.Log10(x))
136 Assert.AreEqual(1000, count); 134 .Anyway(() => {
135 Interlocked.Decrement(ref pending);
136 if (pending == 0)
137 stop.Set();
138 });
139 }
140
141 stop.WaitOne();
142
143 Assert.AreEqual(iterations, count);
144 Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads);
145 pool.Dispose();
146
147 }
148
149 [TestMethod]
150 public void WorkerPoolDisposeTest() {
151 var pool = new WorkerPool(5, 20);
152 Assert.AreEqual(5, pool.ThreadCount);
153 pool.Dispose();
154 Thread.Sleep(100);
155 Assert.AreEqual(0, pool.ThreadCount);
156 pool.Dispose();
137 } 157 }
138 158
139 [TestMethod] 159 [TestMethod]
140 public void MTQueueTest() { 160 public void MTQueueTest() {
141 var queue = new MTQueue<int>(); 161 var queue = new MTQueue<int>();
142 var pool = new WorkerPool(5, 20);
143
144 int res; 162 int res;
145 163
146 queue.Enqueue(10); 164 queue.Enqueue(10);
147 Assert.IsTrue(queue.TryDequeue(out res)); 165 Assert.IsTrue(queue.TryDequeue(out res));
148 Assert.AreEqual(10, res); 166 Assert.AreEqual(10, res);
167 for (int i = 0; i < writersCount; i++) { 185 for (int i = 0; i < writersCount; i++) {
168 Interlocked.Increment(ref writers); 186 Interlocked.Increment(ref writers);
169 var wn = i; 187 var wn = i;
170 AsyncPool 188 AsyncPool
171 .InvokeNewThread(() => { 189 .InvokeNewThread(() => {
172 Console.WriteLine("Started writer: {0}", wn);
173 for (int ii = 0; ii < itemsPerWriter; ii++) { 190 for (int ii = 0; ii < itemsPerWriter; ii++) {
174 queue.Enqueue(1); 191 queue.Enqueue(1);
175 Thread.Sleep(1);
176 } 192 }
177 Console.WriteLine("Stopped writer: {0}", wn);
178 return 1; 193 return 1;
179 }) 194 })
180 .Then(x => Interlocked.Decrement(ref writers) ); 195 .Anyway(() => Interlocked.Decrement(ref writers));
181 } 196 }
182 197
183 for (int i = 0; i < 10; i++) { 198 for (int i = 0; i < 10; i++) {
184 Interlocked.Increment(ref readers); 199 Interlocked.Increment(ref readers);
185 var wn = i; 200 var wn = i;
186 AsyncPool 201 AsyncPool
187 .InvokeNewThread(() => { 202 .InvokeNewThread(() => {
188 int t; 203 int t;
189 Console.WriteLine("Started reader: {0}", wn);
190 do { 204 do {
191 while (queue.TryDequeue(out t)) 205 while (queue.TryDequeue(out t))
192 Interlocked.Add(ref total, t); 206 Interlocked.Add(ref total, t);
193 Thread.Sleep(0);
194 } while (writers > 0); 207 } while (writers > 0);
195 Console.WriteLine("Stopped reader: {0}", wn);
196 return 1; 208 return 1;
197 }) 209 })
198 .Then(x => { 210 .Anyway(() => {
199 Interlocked.Decrement(ref readers); 211 Interlocked.Decrement(ref readers);
200 if (readers == 0) 212 if (readers == 0)
201 stop.Set(); 213 stop.Set();
202 }); 214 });
203 } 215 }
204 216
205 stop.WaitOne(); 217 stop.WaitOne();
206 218
207 Assert.AreEqual(itemsPerWriter * writersCount, total); 219 Assert.AreEqual(itemsPerWriter * writersCount, total);
220 }
221
222 [TestMethod]
223 public void ParallelMapTest() {
224
225 int count = 100000;
226
227 double[] args = new double[count];
228 var rand = new Random();
229
230 for (int i = 0; i < count; i++)
231 args[i] = rand.NextDouble();
232
233 var t = Environment.TickCount;
234 var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join();
235
236 Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t);
237
238 t = Environment.TickCount;
239 for (int i = 0; i < count; i++)
240 Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]);
241 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
242 }
243
244 [TestMethod]
245 public void ParallelForEachTest() {
246
247 int count = 100000;
248
249 int[] args = new int[count];
250 var rand = new Random();
251
252 for (int i = 0; i < count; i++)
253 args[i] = (int)(rand.NextDouble() * 100);
254
255 int result = 0;
256
257 var t = Environment.TickCount;
258 args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join();
259
260 Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result);
261
262 int result2 = 0;
263
264 t = Environment.TickCount;
265 for (int i = 0; i < count; i++)
266 result2 += args[i];
267 Assert.AreEqual(result2, result);
268 Console.WriteLine("Verified in {0} ms", Environment.TickCount - t);
208 } 269 }
209 270
210 [TestMethod] 271 [TestMethod]
211 public void ComplexCase1Test() { 272 public void ComplexCase1Test() {
212 var flags = new bool[3]; 273 var flags = new bool[3];
217 .Sleep(200, "Alan") 278 .Sleep(200, "Alan")
218 .Cancelled(() => flags[0] = true) 279 .Cancelled(() => flags[0] = true)
219 .Chain(x => 280 .Chain(x =>
220 PromiseHelper 281 PromiseHelper
221 .Sleep(200, "Hi, " + x) 282 .Sleep(200, "Hi, " + x)
222 .Map( y => y ) 283 .Map(y => y)
223 .Cancelled(() => flags[1] = true) 284 .Cancelled(() => flags[1] = true)
224 ) 285 )
225 .Cancelled(() => flags[2] = true); 286 .Cancelled(() => flags[2] = true);
226 Thread.Sleep(300); 287 Thread.Sleep(300);
227 p.Cancel(); 288 p.Cancel();
228 try { 289 try {
229 Assert.AreEqual(p.Join(), "Hi, Alan"); 290 Assert.AreEqual(p.Join(), "Hi, Alan");
230 Assert.Fail("Shouldn't get here"); 291 Assert.Fail("Shouldn't get here");
231 } catch(OperationCanceledException) { 292 } catch (OperationCanceledException) {
232 } 293 }
233 294
234 Assert.IsFalse(flags[0]); 295 Assert.IsFalse(flags[0]);
235 Assert.IsTrue(flags[1]); 296 Assert.IsTrue(flags[1]);
236 Assert.IsTrue(flags[2]); 297 Assert.IsTrue(flags[2]);
237 } 298 }
238 } 299 }
239 } 300 }
240 301