annotate Implab/Parallels/ArrayTraits.cs @ 33:b255e4aeef17

removed the reference to the parent from the promise object this allows resolved promises to release parents and results they are holding. Added complete set of operations to IPromiseBase interface Subscribing to the cancellation event of the promise should not affect it's IsExclusive property More tests.
author cin
date Thu, 10 Apr 2014 02:39:29 +0400
parents 8eca2652d2ff
children 2fc0fbe7d58b
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
1 using System;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
2 using System.Collections.Generic;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
3 using System.Diagnostics;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
4 using System.Linq;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
5 using System.Text;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
6 using System.Threading;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
7
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
8 namespace Implab.Parallels {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
9 public static class ArrayTraits {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
10 class ArrayIterator<TSrc> : DispatchPool<int> {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
11 readonly Action<TSrc> m_action;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
12 readonly TSrc[] m_source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
13 readonly Promise<int> m_promise = new Promise<int>();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
14
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
15 int m_pending;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
16 int m_next;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
17
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
19 : base(threads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
20
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
21 Debug.Assert(source != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
22 Debug.Assert(action != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
23
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
24 m_next = 0;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
25 m_source = source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
26 m_pending = source.Length;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
27 m_action = action;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
28
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
29 m_promise.Anyway(() => Dispose());
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
30 m_promise.Cancelled(() => Dispose());
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
31
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
32 InitPool();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
33 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
34
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
35 public Promise<int> Promise {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
36 get {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
37 return m_promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
38 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
39 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
40
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
41 protected override bool TryDequeue(out int unit) {
16
cin
parents: 15
diff changeset
42 unit = Interlocked.Increment(ref m_next) - 1;
cin
parents: 15
diff changeset
43 return unit >= m_source.Length ? false : true;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
44 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
45
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
46 protected override void InvokeUnit(int unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
47 try {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
48 m_action(m_source[unit]);
16
cin
parents: 15
diff changeset
49 var pending = Interlocked.Decrement(ref m_pending);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
50 if (pending == 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
51 m_promise.Resolve(m_source.Length);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
52 } catch (Exception e) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
53 m_promise.Reject(e);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
54 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
55 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
56 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
57
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
59 readonly Func<TSrc, TDst> m_transform;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
60 readonly TSrc[] m_source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
61 readonly TDst[] m_dest;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
63
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
64 int m_pending;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
65 int m_next;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
66
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
68 : base(threads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
69
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
70 Debug.Assert (source != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
71 Debug.Assert( transform != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
72
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
73 m_next = 0;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
74 m_source = source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
75 m_dest = new TDst[source.Length];
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
76 m_pending = source.Length;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
77 m_transform = transform;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
78
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
79 m_promise.Anyway(() => Dispose());
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
80 m_promise.Cancelled(() => Dispose());
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
81
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
82 InitPool();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
83 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
84
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
85 public Promise<TDst[]> Promise {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
86 get {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
87 return m_promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
88 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
89 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
90
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
91 protected override bool TryDequeue(out int unit) {
16
cin
parents: 15
diff changeset
92 unit = Interlocked.Increment(ref m_next) - 1;
cin
parents: 15
diff changeset
93 return unit >= m_source.Length ? false : true;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
94 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
95
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
96 protected override void InvokeUnit(int unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
97 try {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
98 m_dest[unit] = m_transform(m_source[unit]);
16
cin
parents: 15
diff changeset
99 var pending = Interlocked.Decrement(ref m_pending);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
100 if (pending == 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
101 m_promise.Resolve(m_dest);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
102 } catch (Exception e) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
103 m_promise.Reject(e);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
104 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
105 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
106 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
107
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 26
diff changeset
108 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
109 if (source == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
110 throw new ArgumentNullException("source");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
111 if (transform == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
112 throw new ArgumentNullException("transform");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
113
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
115 return mapper.Promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
116 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
117
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 26
diff changeset
118 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
119 if (source == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
120 throw new ArgumentNullException("source");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
121 if (action == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
122 throw new ArgumentNullException("action");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
123
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
124 var iter = new ArrayIterator<TSrc>(source, action, threads);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
125 return iter.Promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
126 }
16
cin
parents: 15
diff changeset
127
26
f0bf98e4d22c refactoring
cin
parents: 25
diff changeset
128 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
16
cin
parents: 15
diff changeset
129 if (source == null)
cin
parents: 15
diff changeset
130 throw new ArgumentNullException("source");
cin
parents: 15
diff changeset
131 if (transform == null)
cin
parents: 15
diff changeset
132 throw new ArgumentNullException("transform");
cin
parents: 15
diff changeset
133 if (threads <= 0)
cin
parents: 15
diff changeset
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
cin
parents: 15
diff changeset
135
32
8eca2652d2ff fixed: StackOverflow in IPromiseBase.Then(handler)
cin
parents: 30
diff changeset
136 if (source.Length == 0)
8eca2652d2ff fixed: StackOverflow in IPromiseBase.Then(handler)
cin
parents: 30
diff changeset
137 return Promise<TDst[]>.ResultToPromise(new TDst[0]);
8eca2652d2ff fixed: StackOverflow in IPromiseBase.Then(handler)
cin
parents: 30
diff changeset
138
16
cin
parents: 15
diff changeset
139 var promise = new Promise<TDst[]>();
cin
parents: 15
diff changeset
140 var res = new TDst[source.Length];
cin
parents: 15
diff changeset
141 var pending = source.Length;
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 26
diff changeset
142
16
cin
parents: 15
diff changeset
143 var semaphore = new Semaphore(threads, threads);
cin
parents: 15
diff changeset
144
cin
parents: 15
diff changeset
145 AsyncPool.InvokeNewThread(() => {
cin
parents: 15
diff changeset
146 for (int i = 0; i < source.Length; i++) {
19
e3935fdf59a2 Promise is rewritten to use interlocked operations instead of locks
cin
parents: 16
diff changeset
147 if(promise.IsResolved)
16
cin
parents: 15
diff changeset
148 break; // stop processing in case of error or cancellation
cin
parents: 15
diff changeset
149 var idx = i;
cin
parents: 15
diff changeset
150 semaphore.WaitOne();
cin
parents: 15
diff changeset
151 try {
cin
parents: 15
diff changeset
152 var p1 = transform(source[i]);
cin
parents: 15
diff changeset
153 p1.Anyway(() => semaphore.Release());
cin
parents: 15
diff changeset
154 p1.Cancelled(() => semaphore.Release());
cin
parents: 15
diff changeset
155 p1.Then(
cin
parents: 15
diff changeset
156 x => {
cin
parents: 15
diff changeset
157 res[idx] = x;
cin
parents: 15
diff changeset
158 var left = Interlocked.Decrement(ref pending);
cin
parents: 15
diff changeset
159 if (left == 0)
cin
parents: 15
diff changeset
160 promise.Resolve(res);
cin
parents: 15
diff changeset
161 },
cin
parents: 15
diff changeset
162 e => promise.Reject(e)
cin
parents: 15
diff changeset
163 );
cin
parents: 15
diff changeset
164
cin
parents: 15
diff changeset
165 } catch (Exception e) {
cin
parents: 15
diff changeset
166 promise.Reject(e);
cin
parents: 15
diff changeset
167 }
cin
parents: 15
diff changeset
168 }
cin
parents: 15
diff changeset
169 return 0;
cin
parents: 15
diff changeset
170 });
cin
parents: 15
diff changeset
171
cin
parents: 15
diff changeset
172 return promise.Anyway(() => semaphore.Dispose());
cin
parents: 15
diff changeset
173 }
24
ee04e1fa78da fixed dispatch pool race condition
cin
parents: 19
diff changeset
174
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
175 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
176 }