Mercurial > pub > ImplabNet
diff Implab/Parallels/ArrayTraits.cs @ 16:5a4b735ba669 promises
sync
author | cin |
---|---|
date | Thu, 07 Nov 2013 20:20:26 +0400 |
parents | 0f982f9b7d4d |
children | e3935fdf59a2 |
line wrap: on
line diff
--- a/Implab/Parallels/ArrayTraits.cs Thu Nov 07 03:41:32 2013 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Thu Nov 07 20:20:26 2013 +0400 @@ -39,26 +39,14 @@ } protected override bool TryDequeue(out int unit) { - int index; - unit = -1; - do { - index = m_next; - if (index >= m_source.Length) - return false; - } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); - - unit = index; - return true; + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; } protected override void InvokeUnit(int unit) { try { m_action(m_source[unit]); - int pending; - do { - pending = m_pending; - } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); - pending--; + var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_source.Length); } catch (Exception e) { @@ -101,26 +89,14 @@ } protected override bool TryDequeue(out int unit) { - int index; - unit = -1; - do { - index = m_next; - if (index >= m_source.Length) - return false; - } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); - - unit = index; - return true; + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; } protected override void InvokeUnit(int unit) { try { m_dest[unit] = m_transform(m_source[unit]); - int pending; - do { - pending = m_pending; - } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); - pending --; + var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_dest); } catch (Exception e) { @@ -148,5 +124,48 @@ var iter = new ArrayIterator<TSrc>(source, action, threads); return iter.Promise; } + + public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise<TDst[]>(); + var res = new TDst[source.Length]; + var pending = source.Length; + var semaphore = new Semaphore(threads, threads); + + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < source.Length; i++) { + if(promise.State != PromiseState.Unresolved) + break; // stop processing in case of error or cancellation + var idx = i; + semaphore.WaitOne(); + try { + var p1 = transform(source[i]); + p1.Anyway(() => semaphore.Release()); + p1.Cancelled(() => semaphore.Release()); + p1.Then( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); + + } catch (Exception e) { + promise.Reject(e); + } + } + return 0; + }); + + return promise.Anyway(() => semaphore.Dispose()); + } } }