Mercurial > pub > ImplabNet
diff Implab/Parallels/ArrayTraits.cs @ 18:0c924dff5498
Слияние с promises
author | cin |
---|---|
date | Fri, 08 Nov 2013 01:27:04 +0400 |
parents | 5a4b735ba669 |
children | e3935fdf59a2 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/ArrayTraits.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public static class ArrayTraits { + class ArrayIterator<TSrc> : DispatchPool<int> { + readonly Action<TSrc> m_action; + readonly TSrc[] m_source; + readonly Promise<int> m_promise = new Promise<int>(); + + int m_pending; + int m_next; + + public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) + : base(threads) { + + Debug.Assert(source != null); + Debug.Assert(action != null); + + m_next = 0; + m_source = source; + m_pending = source.Length; + m_action = action; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise<int> Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; + } + + protected override void InvokeUnit(int unit) { + try { + m_action(m_source[unit]); + var pending = Interlocked.Decrement(ref m_pending); + if (pending == 0) + m_promise.Resolve(m_source.Length); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + class ArrayMapper<TSrc, TDst>: DispatchPool<int> { + readonly Func<TSrc, TDst> m_transform; + readonly TSrc[] m_source; + readonly TDst[] m_dest; + readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); + + int m_pending; + int m_next; + + public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) + : base(threads) { + + Debug.Assert (source != null); + Debug.Assert( transform != null); + + m_next = 0; + m_source = source; + m_dest = new TDst[source.Length]; + m_pending = source.Length; + m_transform = transform; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise<TDst[]> Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; + } + + protected override void InvokeUnit(int unit) { + try { + m_dest[unit] = m_transform(m_source[unit]); + var pending = Interlocked.Decrement(ref m_pending); + if (pending == 0) + m_promise.Resolve(m_dest); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + + var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); + return mapper.Promise; + } + + public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (action == null) + throw new ArgumentNullException("action"); + + var iter = new ArrayIterator<TSrc>(source, action, threads); + return iter.Promise; + } + + public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise<TDst[]>(); + var res = new TDst[source.Length]; + var pending = source.Length; + var semaphore = new Semaphore(threads, threads); + + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < source.Length; i++) { + if(promise.State != PromiseState.Unresolved) + break; // stop processing in case of error or cancellation + var idx = i; + semaphore.WaitOne(); + try { + var p1 = transform(source[i]); + p1.Anyway(() => semaphore.Release()); + p1.Cancelled(() => semaphore.Release()); + p1.Then( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); + + } catch (Exception e) { + promise.Reject(e); + } + } + return 0; + }); + + return promise.Anyway(() => semaphore.Dispose()); + } + } +}