Mercurial > pub > ImplabNet
annotate Implab/Parallels/ArrayTraits.cs @ 79:05e6468f066f v2
sync
| author | cin | 
|---|---|
| date | Mon, 22 Sep 2014 18:20:49 +0400 | 
| parents | c761fc982e1d | 
| children | 4f20870d0816 | 
| rev | line source | 
|---|---|
| 41 | 1 using Implab.Diagnostics; | 
| 2 using System; | |
| 15 | 3 using System.Collections.Generic; | 
| 4 using System.Diagnostics; | |
| 5 using System.Linq; | |
| 6 using System.Text; | |
| 7 using System.Threading; | |
| 8 | |
| 9 namespace Implab.Parallels { | |
| 10 public static class ArrayTraits { | |
| 11 class ArrayIterator<TSrc> : DispatchPool<int> { | |
| 12 readonly Action<TSrc> m_action; | |
| 13 readonly TSrc[] m_source; | |
| 14 readonly Promise<int> m_promise = new Promise<int>(); | |
| 41 | 15 readonly TraceContext m_traceContext; | 
| 15 | 16 | 
| 17 int m_pending; | |
| 18 int m_next; | |
| 19 | |
| 20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | |
| 21 : base(threads) { | |
| 22 | |
| 23 Debug.Assert(source != null); | |
| 24 Debug.Assert(action != null); | |
| 25 | |
| 41 | 26 m_traceContext = TraceContext.Snapshot(); | 
| 15 | 27 m_next = 0; | 
| 28 m_source = source; | |
| 29 m_pending = source.Length; | |
| 30 m_action = action; | |
| 31 | |
| 76 | 32 m_promise.Anyway(Dispose); | 
| 15 | 33 | 
| 34 InitPool(); | |
| 35 } | |
| 36 | |
| 37 public Promise<int> Promise { | |
| 38 get { | |
| 39 return m_promise; | |
| 40 } | |
| 41 } | |
| 42 | |
| 41 | 43 protected override void Worker() { | 
| 48 | 44 TraceContext.Fork(m_traceContext); | 
| 41 | 45 base.Worker(); | 
| 46 } | |
| 47 | |
| 15 | 48 protected override bool TryDequeue(out int unit) { | 
| 16 | 49 unit = Interlocked.Increment(ref m_next) - 1; | 
| 75 | 50 return unit < m_source.Length; | 
| 15 | 51 } | 
| 52 | |
| 53 protected override void InvokeUnit(int unit) { | |
| 54 try { | |
| 55 m_action(m_source[unit]); | |
| 16 | 56 var pending = Interlocked.Decrement(ref m_pending); | 
| 15 | 57 if (pending == 0) | 
| 58 m_promise.Resolve(m_source.Length); | |
| 59 } catch (Exception e) { | |
| 60 m_promise.Reject(e); | |
| 61 } | |
| 62 } | |
| 63 } | |
| 64 | |
| 65 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | |
| 66 readonly Func<TSrc, TDst> m_transform; | |
| 67 readonly TSrc[] m_source; | |
| 68 readonly TDst[] m_dest; | |
| 69 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | |
| 41 | 70 readonly TraceContext m_traceContext; | 
| 15 | 71 | 
| 72 int m_pending; | |
| 73 int m_next; | |
| 74 | |
| 75 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | |
| 76 : base(threads) { | |
| 77 | |
| 78 Debug.Assert (source != null); | |
| 79 Debug.Assert( transform != null); | |
| 80 | |
| 81 m_next = 0; | |
| 82 m_source = source; | |
| 83 m_dest = new TDst[source.Length]; | |
| 84 m_pending = source.Length; | |
| 85 m_transform = transform; | |
| 41 | 86 m_traceContext = TraceContext.Snapshot(); | 
| 15 | 87 | 
| 76 | 88 m_promise.Anyway(Dispose); | 
| 15 | 89 | 
| 90 InitPool(); | |
| 91 } | |
| 92 | |
| 93 public Promise<TDst[]> Promise { | |
| 94 get { | |
| 95 return m_promise; | |
| 96 } | |
| 97 } | |
| 98 | |
| 41 | 99 protected override void Worker() { | 
| 48 | 100 TraceContext.Fork(m_traceContext); | 
| 41 | 101 base.Worker(); | 
| 102 } | |
| 103 | |
| 15 | 104 protected override bool TryDequeue(out int unit) { | 
| 16 | 105 unit = Interlocked.Increment(ref m_next) - 1; | 
| 106 return unit >= m_source.Length ? false : true; | |
| 15 | 107 } | 
| 108 | |
| 109 protected override void InvokeUnit(int unit) { | |
| 110 try { | |
| 111 m_dest[unit] = m_transform(m_source[unit]); | |
| 16 | 112 var pending = Interlocked.Decrement(ref m_pending); | 
| 15 | 113 if (pending == 0) | 
| 114 m_promise.Resolve(m_dest); | |
| 115 } catch (Exception e) { | |
| 116 m_promise.Reject(e); | |
| 117 } | |
| 118 } | |
| 119 } | |
| 120 | |
| 30 | 121 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { | 
| 15 | 122 if (source == null) | 
| 123 throw new ArgumentNullException("source"); | |
| 124 if (transform == null) | |
| 125 throw new ArgumentNullException("transform"); | |
| 126 | |
| 127 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | |
| 128 return mapper.Promise; | |
| 129 } | |
| 130 | |
| 30 | 131 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { | 
| 15 | 132 if (source == null) | 
| 133 throw new ArgumentNullException("source"); | |
| 134 if (action == null) | |
| 135 throw new ArgumentNullException("action"); | |
| 136 | |
| 137 var iter = new ArrayIterator<TSrc>(source, action, threads); | |
| 138 return iter.Promise; | |
| 139 } | |
| 16 | 140 | 
| 76 | 141 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) { | 
| 16 | 142 if (source == null) | 
| 143 throw new ArgumentNullException("source"); | |
| 144 if (transform == null) | |
| 145 throw new ArgumentNullException("transform"); | |
| 146 if (threads <= 0) | |
| 147 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
| 148 | |
| 32 | 149 if (source.Length == 0) | 
| 150 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | |
| 151 | |
| 16 | 152 var promise = new Promise<TDst[]>(); | 
| 153 var res = new TDst[source.Length]; | |
| 154 var pending = source.Length; | |
| 30 | 155 | 
| 16 | 156 var semaphore = new Semaphore(threads, threads); | 
| 157 | |
| 75 | 158 // Analysis disable AccessToDisposedClosure | 
| 16 | 159 AsyncPool.InvokeNewThread(() => { | 
| 160 for (int i = 0; i < source.Length; i++) { | |
| 19 
e3935fdf59a2
Promise is rewritten to use interlocked operations instead of locks
 cin parents: 
16diff
changeset | 161 if(promise.IsResolved) | 
| 16 | 162 break; // stop processing in case of error or cancellation | 
| 163 var idx = i; | |
| 75 | 164 | 
| 16 | 165 semaphore.WaitOne(); | 
| 166 try { | |
| 167 var p1 = transform(source[i]); | |
| 76 | 168 p1.Anyway(() => semaphore.Release()); | 
| 16 | 169 p1.Then( | 
| 170 x => { | |
| 171 res[idx] = x; | |
| 172 var left = Interlocked.Decrement(ref pending); | |
| 173 if (left == 0) | |
| 174 promise.Resolve(res); | |
| 175 }, | |
| 72 | 176 e => { | 
| 177 promise.Reject(e); | |
| 178 throw new TransientPromiseException(e); | |
| 179 } | |
| 16 | 180 ); | 
| 181 | |
| 182 } catch (Exception e) { | |
| 183 promise.Reject(e); | |
| 184 } | |
| 185 } | |
| 186 return 0; | |
| 187 }); | |
| 188 | |
| 76 | 189 return promise.Anyway(semaphore.Dispose); | 
| 16 | 190 } | 
| 24 | 191 | 
| 15 | 192 } | 
| 193 } | 
