Mercurial > pub > ImplabNet
comparison Implab/Parallels/ArrayTraits.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | 5f10d54b45df |
children | a336cb13c6a9 |
comparison
equal
deleted
inserted
replaced
118:e046a94eecb1 | 119:2573b562e328 |
---|---|
24 m_next = 0; | 24 m_next = 0; |
25 m_source = source; | 25 m_source = source; |
26 m_pending = source.Length; | 26 m_pending = source.Length; |
27 m_action = action; | 27 m_action = action; |
28 | 28 |
29 m_promise.Anyway(Dispose); | 29 m_promise.On(Dispose, PromiseEventType.All); |
30 | 30 |
31 InitPool(); | 31 InitPool(); |
32 } | 32 } |
33 | 33 |
34 public Promise<int> Promise { | 34 public Promise<int> Promise { |
84 m_dest = new TDst[source.Length]; | 84 m_dest = new TDst[source.Length]; |
85 m_pending = source.Length; | 85 m_pending = source.Length; |
86 m_transform = transform; | 86 m_transform = transform; |
87 m_logicalOperation = TraceContext.Instance.CurrentOperation; | 87 m_logicalOperation = TraceContext.Instance.CurrentOperation; |
88 | 88 |
89 m_promise.Anyway(Dispose); | 89 m_promise.On(Dispose, PromiseEventType.All); |
90 | 90 |
91 InitPool(); | 91 InitPool(); |
92 } | 92 } |
93 | 93 |
94 public Promise<TDst[]> Promise { | 94 public Promise<TDst[]> Promise { |
160 | 160 |
161 object locker = new object(); | 161 object locker = new object(); |
162 int slots = threads; | 162 int slots = threads; |
163 | 163 |
164 // Analysis disable AccessToDisposedClosure | 164 // Analysis disable AccessToDisposedClosure |
165 AsyncPool.InvokeNewThread(() => { | 165 AsyncPool.InvokeNewThread<int>(() => { |
166 for (int i = 0; i < source.Length; i++) { | 166 for (int i = 0; i < source.Length; i++) { |
167 if(promise.IsResolved) | 167 if(promise.IsResolved) |
168 break; // stop processing in case of error or cancellation | 168 break; // stop processing in case of error or cancellation |
169 var idx = i; | 169 var idx = i; |
170 | 170 |
175 } | 175 } |
176 } | 176 } |
177 | 177 |
178 try { | 178 try { |
179 transform(source[i]) | 179 transform(source[i]) |
180 .Anyway(() => { | 180 .On( x => { |
181 Interlocked.Increment(ref slots); | 181 Interlocked.Increment(ref slots); |
182 lock (locker) { | 182 lock (locker) { |
183 Monitor.Pulse(locker); | 183 Monitor.Pulse(locker); |
184 } | 184 } |
185 }) | 185 }) |