Mercurial > pub > ImplabNet
comparison Implab/Parallels/ArrayTraits.cs @ 75:4439140706d0 v2
major refactoring, added tasks support
author | cin |
---|---|
date | Wed, 10 Sep 2014 11:17:37 +0400 |
parents | d67b95eddaf4 |
children | c761fc982e1d |
comparison
equal
deleted
inserted
replaced
74:c4140283575c | 75:4439140706d0 |
---|---|
27 m_next = 0; | 27 m_next = 0; |
28 m_source = source; | 28 m_source = source; |
29 m_pending = source.Length; | 29 m_pending = source.Length; |
30 m_action = action; | 30 m_action = action; |
31 | 31 |
32 m_promise.Anyway(() => Dispose()); | 32 m_promise.Finally(Dispose); |
33 m_promise.Cancelled(() => Dispose()); | |
34 | 33 |
35 InitPool(); | 34 InitPool(); |
36 } | 35 } |
37 | 36 |
38 public Promise<int> Promise { | 37 public Promise<int> Promise { |
46 base.Worker(); | 45 base.Worker(); |
47 } | 46 } |
48 | 47 |
49 protected override bool TryDequeue(out int unit) { | 48 protected override bool TryDequeue(out int unit) { |
50 unit = Interlocked.Increment(ref m_next) - 1; | 49 unit = Interlocked.Increment(ref m_next) - 1; |
51 return unit >= m_source.Length ? false : true; | 50 return unit < m_source.Length; |
52 } | 51 } |
53 | 52 |
54 protected override void InvokeUnit(int unit) { | 53 protected override void InvokeUnit(int unit) { |
55 try { | 54 try { |
56 m_action(m_source[unit]); | 55 m_action(m_source[unit]); |
84 m_dest = new TDst[source.Length]; | 83 m_dest = new TDst[source.Length]; |
85 m_pending = source.Length; | 84 m_pending = source.Length; |
86 m_transform = transform; | 85 m_transform = transform; |
87 m_traceContext = TraceContext.Snapshot(); | 86 m_traceContext = TraceContext.Snapshot(); |
88 | 87 |
89 m_promise.Anyway(() => Dispose()); | 88 m_promise.Finally(Dispose); |
90 m_promise.Cancelled(() => Dispose()); | |
91 | 89 |
92 InitPool(); | 90 InitPool(); |
93 } | 91 } |
94 | 92 |
95 public Promise<TDst[]> Promise { | 93 public Promise<TDst[]> Promise { |
155 var res = new TDst[source.Length]; | 153 var res = new TDst[source.Length]; |
156 var pending = source.Length; | 154 var pending = source.Length; |
157 | 155 |
158 var semaphore = new Semaphore(threads, threads); | 156 var semaphore = new Semaphore(threads, threads); |
159 | 157 |
158 // Analysis disable AccessToDisposedClosure | |
160 AsyncPool.InvokeNewThread(() => { | 159 AsyncPool.InvokeNewThread(() => { |
161 for (int i = 0; i < source.Length; i++) { | 160 for (int i = 0; i < source.Length; i++) { |
162 if(promise.IsResolved) | 161 if(promise.IsResolved) |
163 break; // stop processing in case of error or cancellation | 162 break; // stop processing in case of error or cancellation |
164 var idx = i; | 163 var idx = i; |
164 | |
165 semaphore.WaitOne(); | 165 semaphore.WaitOne(); |
166 try { | 166 try { |
167 var p1 = transform(source[i]); | 167 var p1 = transform(source[i]); |
168 p1.Anyway(() => semaphore.Release()); | 168 p1.Finally(() => semaphore.Release()); |
169 p1.Cancelled(() => semaphore.Release()); | |
170 p1.Then( | 169 p1.Then( |
171 x => { | 170 x => { |
172 res[idx] = x; | 171 res[idx] = x; |
173 var left = Interlocked.Decrement(ref pending); | 172 var left = Interlocked.Decrement(ref pending); |
174 if (left == 0) | 173 if (left == 0) |
185 } | 184 } |
186 } | 185 } |
187 return 0; | 186 return 0; |
188 }); | 187 }); |
189 | 188 |
190 return promise.Anyway(() => semaphore.Dispose()); | 189 return promise.Finally(semaphore.Dispose); |
191 } | 190 } |
192 | 191 |
193 } | 192 } |
194 } | 193 } |