Mercurial > pub > ImplabNet
annotate Implab/Parallels/ArrayTraits.cs @ 79:05e6468f066f v2
sync
author | cin |
---|---|
date | Mon, 22 Sep 2014 18:20:49 +0400 |
parents | c761fc982e1d |
children | 4f20870d0816 |
rev | line source |
---|---|
41 | 1 using Implab.Diagnostics; |
2 using System; | |
15 | 3 using System.Collections.Generic; |
4 using System.Diagnostics; | |
5 using System.Linq; | |
6 using System.Text; | |
7 using System.Threading; | |
8 | |
9 namespace Implab.Parallels { | |
10 public static class ArrayTraits { | |
11 class ArrayIterator<TSrc> : DispatchPool<int> { | |
12 readonly Action<TSrc> m_action; | |
13 readonly TSrc[] m_source; | |
14 readonly Promise<int> m_promise = new Promise<int>(); | |
41 | 15 readonly TraceContext m_traceContext; |
15 | 16 |
17 int m_pending; | |
18 int m_next; | |
19 | |
20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | |
21 : base(threads) { | |
22 | |
23 Debug.Assert(source != null); | |
24 Debug.Assert(action != null); | |
25 | |
41 | 26 m_traceContext = TraceContext.Snapshot(); |
15 | 27 m_next = 0; |
28 m_source = source; | |
29 m_pending = source.Length; | |
30 m_action = action; | |
31 | |
76 | 32 m_promise.Anyway(Dispose); |
15 | 33 |
34 InitPool(); | |
35 } | |
36 | |
37 public Promise<int> Promise { | |
38 get { | |
39 return m_promise; | |
40 } | |
41 } | |
42 | |
41 | 43 protected override void Worker() { |
48 | 44 TraceContext.Fork(m_traceContext); |
41 | 45 base.Worker(); |
46 } | |
47 | |
15 | 48 protected override bool TryDequeue(out int unit) { |
16 | 49 unit = Interlocked.Increment(ref m_next) - 1; |
75 | 50 return unit < m_source.Length; |
15 | 51 } |
52 | |
53 protected override void InvokeUnit(int unit) { | |
54 try { | |
55 m_action(m_source[unit]); | |
16 | 56 var pending = Interlocked.Decrement(ref m_pending); |
15 | 57 if (pending == 0) |
58 m_promise.Resolve(m_source.Length); | |
59 } catch (Exception e) { | |
60 m_promise.Reject(e); | |
61 } | |
62 } | |
63 } | |
64 | |
65 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | |
66 readonly Func<TSrc, TDst> m_transform; | |
67 readonly TSrc[] m_source; | |
68 readonly TDst[] m_dest; | |
69 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | |
41 | 70 readonly TraceContext m_traceContext; |
15 | 71 |
72 int m_pending; | |
73 int m_next; | |
74 | |
75 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | |
76 : base(threads) { | |
77 | |
78 Debug.Assert (source != null); | |
79 Debug.Assert( transform != null); | |
80 | |
81 m_next = 0; | |
82 m_source = source; | |
83 m_dest = new TDst[source.Length]; | |
84 m_pending = source.Length; | |
85 m_transform = transform; | |
41 | 86 m_traceContext = TraceContext.Snapshot(); |
15 | 87 |
76 | 88 m_promise.Anyway(Dispose); |
15 | 89 |
90 InitPool(); | |
91 } | |
92 | |
93 public Promise<TDst[]> Promise { | |
94 get { | |
95 return m_promise; | |
96 } | |
97 } | |
98 | |
41 | 99 protected override void Worker() { |
48 | 100 TraceContext.Fork(m_traceContext); |
41 | 101 base.Worker(); |
102 } | |
103 | |
15 | 104 protected override bool TryDequeue(out int unit) { |
16 | 105 unit = Interlocked.Increment(ref m_next) - 1; |
106 return unit >= m_source.Length ? false : true; | |
15 | 107 } |
108 | |
109 protected override void InvokeUnit(int unit) { | |
110 try { | |
111 m_dest[unit] = m_transform(m_source[unit]); | |
16 | 112 var pending = Interlocked.Decrement(ref m_pending); |
15 | 113 if (pending == 0) |
114 m_promise.Resolve(m_dest); | |
115 } catch (Exception e) { | |
116 m_promise.Reject(e); | |
117 } | |
118 } | |
119 } | |
120 | |
30 | 121 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { |
15 | 122 if (source == null) |
123 throw new ArgumentNullException("source"); | |
124 if (transform == null) | |
125 throw new ArgumentNullException("transform"); | |
126 | |
127 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | |
128 return mapper.Promise; | |
129 } | |
130 | |
30 | 131 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { |
15 | 132 if (source == null) |
133 throw new ArgumentNullException("source"); | |
134 if (action == null) | |
135 throw new ArgumentNullException("action"); | |
136 | |
137 var iter = new ArrayIterator<TSrc>(source, action, threads); | |
138 return iter.Promise; | |
139 } | |
16 | 140 |
76 | 141 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) { |
16 | 142 if (source == null) |
143 throw new ArgumentNullException("source"); | |
144 if (transform == null) | |
145 throw new ArgumentNullException("transform"); | |
146 if (threads <= 0) | |
147 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
148 | |
32 | 149 if (source.Length == 0) |
150 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | |
151 | |
16 | 152 var promise = new Promise<TDst[]>(); |
153 var res = new TDst[source.Length]; | |
154 var pending = source.Length; | |
30 | 155 |
16 | 156 var semaphore = new Semaphore(threads, threads); |
157 | |
75 | 158 // Analysis disable AccessToDisposedClosure |
16 | 159 AsyncPool.InvokeNewThread(() => { |
160 for (int i = 0; i < source.Length; i++) { | |
19
e3935fdf59a2
Promise is rewritten to use interlocked operations instead of locks
cin
parents:
16
diff
changeset
|
161 if(promise.IsResolved) |
16 | 162 break; // stop processing in case of error or cancellation |
163 var idx = i; | |
75 | 164 |
16 | 165 semaphore.WaitOne(); |
166 try { | |
167 var p1 = transform(source[i]); | |
76 | 168 p1.Anyway(() => semaphore.Release()); |
16 | 169 p1.Then( |
170 x => { | |
171 res[idx] = x; | |
172 var left = Interlocked.Decrement(ref pending); | |
173 if (left == 0) | |
174 promise.Resolve(res); | |
175 }, | |
72 | 176 e => { |
177 promise.Reject(e); | |
178 throw new TransientPromiseException(e); | |
179 } | |
16 | 180 ); |
181 | |
182 } catch (Exception e) { | |
183 promise.Reject(e); | |
184 } | |
185 } | |
186 return 0; | |
187 }); | |
188 | |
76 | 189 return promise.Anyway(semaphore.Dispose); |
16 | 190 } |
24 | 191 |
15 | 192 } |
193 } |