Mercurial > pub > ImplabNet
annotate Implab/Parallels/ArrayTraits.cs @ 72:d67b95eddaf4 v2
promises refactoring
author | cin |
---|---|
date | Thu, 04 Sep 2014 18:47:12 +0400 |
parents | d9d794b61bb9 |
children | 4439140706d0 |
rev | line source |
---|---|
41 | 1 using Implab.Diagnostics; |
2 using System; | |
15 | 3 using System.Collections.Generic; |
4 using System.Diagnostics; | |
5 using System.Linq; | |
6 using System.Text; | |
7 using System.Threading; | |
8 | |
9 namespace Implab.Parallels { | |
10 public static class ArrayTraits { | |
11 class ArrayIterator<TSrc> : DispatchPool<int> { | |
12 readonly Action<TSrc> m_action; | |
13 readonly TSrc[] m_source; | |
14 readonly Promise<int> m_promise = new Promise<int>(); | |
41 | 15 readonly TraceContext m_traceContext; |
15 | 16 |
17 int m_pending; | |
18 int m_next; | |
19 | |
20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | |
21 : base(threads) { | |
22 | |
23 Debug.Assert(source != null); | |
24 Debug.Assert(action != null); | |
25 | |
41 | 26 m_traceContext = TraceContext.Snapshot(); |
15 | 27 m_next = 0; |
28 m_source = source; | |
29 m_pending = source.Length; | |
30 m_action = action; | |
31 | |
32 m_promise.Anyway(() => Dispose()); | |
33 m_promise.Cancelled(() => Dispose()); | |
34 | |
35 InitPool(); | |
36 } | |
37 | |
38 public Promise<int> Promise { | |
39 get { | |
40 return m_promise; | |
41 } | |
42 } | |
43 | |
41 | 44 protected override void Worker() { |
48 | 45 TraceContext.Fork(m_traceContext); |
41 | 46 base.Worker(); |
47 } | |
48 | |
15 | 49 protected override bool TryDequeue(out int unit) { |
16 | 50 unit = Interlocked.Increment(ref m_next) - 1; |
51 return unit >= m_source.Length ? false : true; | |
15 | 52 } |
53 | |
54 protected override void InvokeUnit(int unit) { | |
55 try { | |
56 m_action(m_source[unit]); | |
16 | 57 var pending = Interlocked.Decrement(ref m_pending); |
15 | 58 if (pending == 0) |
59 m_promise.Resolve(m_source.Length); | |
60 } catch (Exception e) { | |
61 m_promise.Reject(e); | |
62 } | |
63 } | |
64 } | |
65 | |
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | |
67 readonly Func<TSrc, TDst> m_transform; | |
68 readonly TSrc[] m_source; | |
69 readonly TDst[] m_dest; | |
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | |
41 | 71 readonly TraceContext m_traceContext; |
15 | 72 |
73 int m_pending; | |
74 int m_next; | |
75 | |
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | |
77 : base(threads) { | |
78 | |
79 Debug.Assert (source != null); | |
80 Debug.Assert( transform != null); | |
81 | |
82 m_next = 0; | |
83 m_source = source; | |
84 m_dest = new TDst[source.Length]; | |
85 m_pending = source.Length; | |
86 m_transform = transform; | |
41 | 87 m_traceContext = TraceContext.Snapshot(); |
15 | 88 |
89 m_promise.Anyway(() => Dispose()); | |
90 m_promise.Cancelled(() => Dispose()); | |
91 | |
92 InitPool(); | |
93 } | |
94 | |
95 public Promise<TDst[]> Promise { | |
96 get { | |
97 return m_promise; | |
98 } | |
99 } | |
100 | |
41 | 101 protected override void Worker() { |
48 | 102 TraceContext.Fork(m_traceContext); |
41 | 103 base.Worker(); |
104 } | |
105 | |
15 | 106 protected override bool TryDequeue(out int unit) { |
16 | 107 unit = Interlocked.Increment(ref m_next) - 1; |
108 return unit >= m_source.Length ? false : true; | |
15 | 109 } |
110 | |
111 protected override void InvokeUnit(int unit) { | |
112 try { | |
113 m_dest[unit] = m_transform(m_source[unit]); | |
16 | 114 var pending = Interlocked.Decrement(ref m_pending); |
15 | 115 if (pending == 0) |
116 m_promise.Resolve(m_dest); | |
117 } catch (Exception e) { | |
118 m_promise.Reject(e); | |
119 } | |
120 } | |
121 } | |
122 | |
30 | 123 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { |
15 | 124 if (source == null) |
125 throw new ArgumentNullException("source"); | |
126 if (transform == null) | |
127 throw new ArgumentNullException("transform"); | |
128 | |
129 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | |
130 return mapper.Promise; | |
131 } | |
132 | |
30 | 133 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { |
15 | 134 if (source == null) |
135 throw new ArgumentNullException("source"); | |
136 if (action == null) | |
137 throw new ArgumentNullException("action"); | |
138 | |
139 var iter = new ArrayIterator<TSrc>(source, action, threads); | |
140 return iter.Promise; | |
141 } | |
16 | 142 |
26 | 143 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { |
16 | 144 if (source == null) |
145 throw new ArgumentNullException("source"); | |
146 if (transform == null) | |
147 throw new ArgumentNullException("transform"); | |
148 if (threads <= 0) | |
149 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
150 | |
32 | 151 if (source.Length == 0) |
152 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | |
153 | |
16 | 154 var promise = new Promise<TDst[]>(); |
155 var res = new TDst[source.Length]; | |
156 var pending = source.Length; | |
30 | 157 |
16 | 158 var semaphore = new Semaphore(threads, threads); |
159 | |
160 AsyncPool.InvokeNewThread(() => { | |
161 for (int i = 0; i < source.Length; i++) { | |
19
e3935fdf59a2
Promise is rewritten to use interlocked operations instead of locks
cin
parents:
16
diff
changeset
|
162 if(promise.IsResolved) |
16 | 163 break; // stop processing in case of error or cancellation |
164 var idx = i; | |
165 semaphore.WaitOne(); | |
166 try { | |
167 var p1 = transform(source[i]); | |
168 p1.Anyway(() => semaphore.Release()); | |
169 p1.Cancelled(() => semaphore.Release()); | |
170 p1.Then( | |
171 x => { | |
172 res[idx] = x; | |
173 var left = Interlocked.Decrement(ref pending); | |
174 if (left == 0) | |
175 promise.Resolve(res); | |
176 }, | |
72 | 177 e => { |
178 promise.Reject(e); | |
179 throw new TransientPromiseException(e); | |
180 } | |
16 | 181 ); |
182 | |
183 } catch (Exception e) { | |
184 promise.Reject(e); | |
185 } | |
186 } | |
187 return 0; | |
188 }); | |
189 | |
190 return promise.Anyway(() => semaphore.Dispose()); | |
191 } | |
24 | 192 |
15 | 193 } |
194 } |