annotate Implab/Parallels/ArrayTraits.cs @ 196:40d7fed4a09e

fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author cin
date Mon, 29 Aug 2016 23:15:51 +0300
parents 706fccb85524
children cbe10ac0731e
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
41
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
1 using Implab.Diagnostics;
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
2 using System;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
3 using System.Diagnostics;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
4 using System.Threading;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
5
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
6 namespace Implab.Parallels {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
7 public static class ArrayTraits {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
8 class ArrayIterator<TSrc> : DispatchPool<int> {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
9 readonly Action<TSrc> m_action;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
10 readonly TSrc[] m_source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
11 readonly Promise<int> m_promise = new Promise<int>();
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
12 readonly LogicalOperation m_logicalOperation;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
13
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
14 int m_pending;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
15 int m_next;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
16
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
18 : base(threads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
19
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
20 Debug.Assert(source != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
21 Debug.Assert(action != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
22
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
24 m_next = 0;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
25 m_source = source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
26 m_pending = source.Length;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
27 m_action = action;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
28
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents: 104
diff changeset
29 m_promise.On(Dispose, PromiseEventType.All);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
30
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
31 InitPool();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
32 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
33
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
34 public Promise<int> Promise {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
35 get {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
36 return m_promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
37 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
38 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
39
41
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
40 protected override void Worker() {
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
42 try {
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
43 base.Worker();
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
44 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
45 TraceContext.Instance.Leave();
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
46 }
41
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
47 }
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
48
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
49 protected override bool TryDequeue(out int unit) {
16
cin
parents: 15
diff changeset
50 unit = Interlocked.Increment(ref m_next) - 1;
75
4439140706d0 major refactoring, added tasks support
cin
parents: 72
diff changeset
51 return unit < m_source.Length;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
52 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
53
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
54 protected override void InvokeUnit(int unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
55 try {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
56 m_action(m_source[unit]);
16
cin
parents: 15
diff changeset
57 var pending = Interlocked.Decrement(ref m_pending);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
58 if (pending == 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
59 m_promise.Resolve(m_source.Length);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
60 } catch (Exception e) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
61 m_promise.Reject(e);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
62 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
63 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
64 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
65
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
67 readonly Func<TSrc, TDst> m_transform;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
68 readonly TSrc[] m_source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
69 readonly TDst[] m_dest;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
71 readonly LogicalOperation m_logicalOperation;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
72
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
73 int m_pending;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
74 int m_next;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
75
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
77 : base(threads) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
78
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
79 Debug.Assert (source != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
80 Debug.Assert( transform != null);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
81
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
82 m_next = 0;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
83 m_source = source;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
84 m_dest = new TDst[source.Length];
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
85 m_pending = source.Length;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
86 m_transform = transform;
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
88
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents: 104
diff changeset
89 m_promise.On(Dispose, PromiseEventType.All);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
90
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
91 InitPool();
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
92 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
93
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
94 public Promise<TDst[]> Promise {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
95 get {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
96 return m_promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
97 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
98 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
99
41
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
100 protected override void Worker() {
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
102 try {
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
103 base.Worker();
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
104 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
105 TraceContext.Instance.Leave();
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
106 }
41
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
107 }
2fc0fbe7d58b Added TraceContext support to array traits
cin
parents: 32
diff changeset
108
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
109 protected override bool TryDequeue(out int unit) {
16
cin
parents: 15
diff changeset
110 unit = Interlocked.Increment(ref m_next) - 1;
92
4c0e5ef99986 rewritten tracing
cin
parents: 89
diff changeset
111 return unit < m_source.Length;
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
112 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
113
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
114 protected override void InvokeUnit(int unit) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
115 try {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
116 m_dest[unit] = m_transform(m_source[unit]);
16
cin
parents: 15
diff changeset
117 var pending = Interlocked.Decrement(ref m_pending);
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
118 if (pending == 0)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
119 m_promise.Resolve(m_dest);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
120 } catch (Exception e) {
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
121 m_promise.Reject(e);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
122 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
123 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
124 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
125
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 26
diff changeset
126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
127 if (source == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
128 throw new ArgumentNullException("source");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
129 if (transform == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
130 throw new ArgumentNullException("transform");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
131
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
133 return mapper.Promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
134 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
135
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 26
diff changeset
136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) {
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
137 if (source == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
138 throw new ArgumentNullException("source");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
139 if (action == null)
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
140 throw new ArgumentNullException("action");
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
141
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
142 var iter = new ArrayIterator<TSrc>(source, action, threads);
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
143 return iter.Promise;
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
144 }
16
cin
parents: 15
diff changeset
145
101
279e226dffdd code cleanup
cin
parents: 92
diff changeset
146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
16
cin
parents: 15
diff changeset
147 if (source == null)
cin
parents: 15
diff changeset
148 throw new ArgumentNullException("source");
cin
parents: 15
diff changeset
149 if (transform == null)
cin
parents: 15
diff changeset
150 throw new ArgumentNullException("transform");
cin
parents: 15
diff changeset
151 if (threads <= 0)
101
279e226dffdd code cleanup
cin
parents: 92
diff changeset
152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
16
cin
parents: 15
diff changeset
153
32
8eca2652d2ff fixed: StackOverflow in IPromiseBase.Then(handler)
cin
parents: 30
diff changeset
154 if (source.Length == 0)
145
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
155 return Promise<TDst[]>.FromResult(new TDst[0]);
32
8eca2652d2ff fixed: StackOverflow in IPromiseBase.Then(handler)
cin
parents: 30
diff changeset
156
16
cin
parents: 15
diff changeset
157 var promise = new Promise<TDst[]>();
cin
parents: 15
diff changeset
158 var res = new TDst[source.Length];
cin
parents: 15
diff changeset
159 var pending = source.Length;
30
2fad2d1f4b03 small refactoring, cleanup.
cin
parents: 26
diff changeset
160
80
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
161 object locker = new object();
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
162 int slots = threads;
16
cin
parents: 15
diff changeset
163
75
4439140706d0 major refactoring, added tasks support
cin
parents: 72
diff changeset
164 // Analysis disable AccessToDisposedClosure
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 119
diff changeset
165 AsyncPool.RunThread<int>(() => {
16
cin
parents: 15
diff changeset
166 for (int i = 0; i < source.Length; i++) {
19
e3935fdf59a2 Promise is rewritten to use interlocked operations instead of locks
cin
parents: 16
diff changeset
167 if(promise.IsResolved)
16
cin
parents: 15
diff changeset
168 break; // stop processing in case of error or cancellation
cin
parents: 15
diff changeset
169 var idx = i;
75
4439140706d0 major refactoring, added tasks support
cin
parents: 72
diff changeset
170
89
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
171 if (Interlocked.Decrement(ref slots) < 0) {
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
172 lock(locker) {
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
173 while(slots < 0)
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
174 Monitor.Wait(locker);
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
175 }
80
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
176 }
89
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
177
16
cin
parents: 15
diff changeset
178 try {
80
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
179 transform(source[i])
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents: 104
diff changeset
180 .On( x => {
89
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
181 Interlocked.Increment(ref slots);
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
182 lock (locker) {
80
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
183 Monitor.Pulse(locker);
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
184 }
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
185 })
104
5f10d54b45df renamed Promise.Last -> Promise.On
cin
parents: 101
diff changeset
186 .On(
80
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
187 x => {
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
188 res[idx] = x;
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
189 var left = Interlocked.Decrement(ref pending);
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
190 if (left == 0)
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
191 promise.Resolve(res);
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
192 },
89
ce0171cacec4 improved performance of a chained map operation
cin
parents: 80
diff changeset
193 promise.Reject
80
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
194 );
16
cin
parents: 15
diff changeset
195
cin
parents: 15
diff changeset
196 } catch (Exception e) {
cin
parents: 15
diff changeset
197 promise.Reject(e);
cin
parents: 15
diff changeset
198 }
cin
parents: 15
diff changeset
199 }
cin
parents: 15
diff changeset
200 return 0;
cin
parents: 15
diff changeset
201 });
cin
parents: 15
diff changeset
202
80
4f20870d0816 added memory barriers
cin
parents: 76
diff changeset
203 return promise;
16
cin
parents: 15
diff changeset
204 }
24
ee04e1fa78da fixed dispatch pool race condition
cin
parents: 19
diff changeset
205
15
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
206 }
0f982f9b7d4d implemented parallel map and foreach for arrays
cin
parents:
diff changeset
207 }