comparison Implab/Parallels/ArrayTraits.cs @ 119:2573b562e328 v2

Promises rewritten, added improved version of AsyncQueue
author cin
date Sun, 11 Jan 2015 19:13:02 +0300
parents 5f10d54b45df
children a336cb13c6a9
comparison
equal deleted inserted replaced
118:e046a94eecb1 119:2573b562e328
24 m_next = 0; 24 m_next = 0;
25 m_source = source; 25 m_source = source;
26 m_pending = source.Length; 26 m_pending = source.Length;
27 m_action = action; 27 m_action = action;
28 28
29 m_promise.Anyway(Dispose); 29 m_promise.On(Dispose, PromiseEventType.All);
30 30
31 InitPool(); 31 InitPool();
32 } 32 }
33 33
34 public Promise<int> Promise { 34 public Promise<int> Promise {
84 m_dest = new TDst[source.Length]; 84 m_dest = new TDst[source.Length];
85 m_pending = source.Length; 85 m_pending = source.Length;
86 m_transform = transform; 86 m_transform = transform;
87 m_logicalOperation = TraceContext.Instance.CurrentOperation; 87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88 88
89 m_promise.Anyway(Dispose); 89 m_promise.On(Dispose, PromiseEventType.All);
90 90
91 InitPool(); 91 InitPool();
92 } 92 }
93 93
94 public Promise<TDst[]> Promise { 94 public Promise<TDst[]> Promise {
160 160
161 object locker = new object(); 161 object locker = new object();
162 int slots = threads; 162 int slots = threads;
163 163
164 // Analysis disable AccessToDisposedClosure 164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.InvokeNewThread(() => { 165 AsyncPool.InvokeNewThread<int>(() => {
166 for (int i = 0; i < source.Length; i++) { 166 for (int i = 0; i < source.Length; i++) {
167 if(promise.IsResolved) 167 if(promise.IsResolved)
168 break; // stop processing in case of error or cancellation 168 break; // stop processing in case of error or cancellation
169 var idx = i; 169 var idx = i;
170 170
175 } 175 }
176 } 176 }
177 177
178 try { 178 try {
179 transform(source[i]) 179 transform(source[i])
180 .Anyway(() => { 180 .On( x => {
181 Interlocked.Increment(ref slots); 181 Interlocked.Increment(ref slots);
182 lock (locker) { 182 lock (locker) {
183 Monitor.Pulse(locker); 183 Monitor.Pulse(locker);
184 } 184 }
185 }) 185 })