comparison Implab/Parallels/ArrayTraits.cs @ 75:4439140706d0 v2

major refactoring, added tasks support
author cin
date Wed, 10 Sep 2014 11:17:37 +0400
parents d67b95eddaf4
children c761fc982e1d
comparison
equal deleted inserted replaced
74:c4140283575c 75:4439140706d0
27 m_next = 0; 27 m_next = 0;
28 m_source = source; 28 m_source = source;
29 m_pending = source.Length; 29 m_pending = source.Length;
30 m_action = action; 30 m_action = action;
31 31
32 m_promise.Anyway(() => Dispose()); 32 m_promise.Finally(Dispose);
33 m_promise.Cancelled(() => Dispose());
34 33
35 InitPool(); 34 InitPool();
36 } 35 }
37 36
38 public Promise<int> Promise { 37 public Promise<int> Promise {
46 base.Worker(); 45 base.Worker();
47 } 46 }
48 47
49 protected override bool TryDequeue(out int unit) { 48 protected override bool TryDequeue(out int unit) {
50 unit = Interlocked.Increment(ref m_next) - 1; 49 unit = Interlocked.Increment(ref m_next) - 1;
51 return unit >= m_source.Length ? false : true; 50 return unit < m_source.Length;
52 } 51 }
53 52
54 protected override void InvokeUnit(int unit) { 53 protected override void InvokeUnit(int unit) {
55 try { 54 try {
56 m_action(m_source[unit]); 55 m_action(m_source[unit]);
84 m_dest = new TDst[source.Length]; 83 m_dest = new TDst[source.Length];
85 m_pending = source.Length; 84 m_pending = source.Length;
86 m_transform = transform; 85 m_transform = transform;
87 m_traceContext = TraceContext.Snapshot(); 86 m_traceContext = TraceContext.Snapshot();
88 87
89 m_promise.Anyway(() => Dispose()); 88 m_promise.Finally(Dispose);
90 m_promise.Cancelled(() => Dispose());
91 89
92 InitPool(); 90 InitPool();
93 } 91 }
94 92
95 public Promise<TDst[]> Promise { 93 public Promise<TDst[]> Promise {
155 var res = new TDst[source.Length]; 153 var res = new TDst[source.Length];
156 var pending = source.Length; 154 var pending = source.Length;
157 155
158 var semaphore = new Semaphore(threads, threads); 156 var semaphore = new Semaphore(threads, threads);
159 157
158 // Analysis disable AccessToDisposedClosure
160 AsyncPool.InvokeNewThread(() => { 159 AsyncPool.InvokeNewThread(() => {
161 for (int i = 0; i < source.Length; i++) { 160 for (int i = 0; i < source.Length; i++) {
162 if(promise.IsResolved) 161 if(promise.IsResolved)
163 break; // stop processing in case of error or cancellation 162 break; // stop processing in case of error or cancellation
164 var idx = i; 163 var idx = i;
164
165 semaphore.WaitOne(); 165 semaphore.WaitOne();
166 try { 166 try {
167 var p1 = transform(source[i]); 167 var p1 = transform(source[i]);
168 p1.Anyway(() => semaphore.Release()); 168 p1.Finally(() => semaphore.Release());
169 p1.Cancelled(() => semaphore.Release());
170 p1.Then( 169 p1.Then(
171 x => { 170 x => {
172 res[idx] = x; 171 res[idx] = x;
173 var left = Interlocked.Decrement(ref pending); 172 var left = Interlocked.Decrement(ref pending);
174 if (left == 0) 173 if (left == 0)
185 } 184 }
186 } 185 }
187 return 0; 186 return 0;
188 }); 187 });
189 188
190 return promise.Anyway(() => semaphore.Dispose()); 189 return promise.Finally(semaphore.Dispose);
191 } 190 }
192 191
193 } 192 }
194 } 193 }