Mercurial > pub > ImplabNet
annotate Implab/Parallels/ArrayTraits.cs @ 99:8ddf1648eca4 v2
fixed TransientPromiseException handling
| author | cin |
|---|---|
| date | Wed, 05 Nov 2014 02:31:35 +0300 |
| parents | 4c0e5ef99986 |
| children | 279e226dffdd |
| rev | line source |
|---|---|
| 41 | 1 using Implab.Diagnostics; |
| 2 using System; | |
| 15 | 3 using System.Collections.Generic; |
| 4 using System.Diagnostics; | |
| 5 using System.Linq; | |
| 6 using System.Text; | |
| 7 using System.Threading; | |
| 8 | |
| 9 namespace Implab.Parallels { | |
| 10 public static class ArrayTraits { | |
| 11 class ArrayIterator<TSrc> : DispatchPool<int> { | |
| 12 readonly Action<TSrc> m_action; | |
| 13 readonly TSrc[] m_source; | |
| 14 readonly Promise<int> m_promise = new Promise<int>(); | |
| 92 | 15 readonly LogicalOperation m_logicalOperation; |
| 15 | 16 |
| 17 int m_pending; | |
| 18 int m_next; | |
| 19 | |
| 20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | |
| 21 : base(threads) { | |
| 22 | |
| 23 Debug.Assert(source != null); | |
| 24 Debug.Assert(action != null); | |
| 25 | |
| 92 | 26 m_logicalOperation = TraceContext.Instance.CurrentOperation; |
| 15 | 27 m_next = 0; |
| 28 m_source = source; | |
| 29 m_pending = source.Length; | |
| 30 m_action = action; | |
| 31 | |
| 76 | 32 m_promise.Anyway(Dispose); |
| 15 | 33 |
| 34 InitPool(); | |
| 35 } | |
| 36 | |
| 37 public Promise<int> Promise { | |
| 38 get { | |
| 39 return m_promise; | |
| 40 } | |
| 41 } | |
| 42 | |
| 41 | 43 protected override void Worker() { |
| 92 | 44 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); |
| 45 try { | |
| 46 base.Worker(); | |
| 47 } finally { | |
| 48 TraceContext.Instance.Leave(); | |
| 49 } | |
| 41 | 50 } |
| 51 | |
| 15 | 52 protected override bool TryDequeue(out int unit) { |
| 16 | 53 unit = Interlocked.Increment(ref m_next) - 1; |
| 75 | 54 return unit < m_source.Length; |
| 15 | 55 } |
| 56 | |
| 57 protected override void InvokeUnit(int unit) { | |
| 58 try { | |
| 59 m_action(m_source[unit]); | |
| 16 | 60 var pending = Interlocked.Decrement(ref m_pending); |
| 15 | 61 if (pending == 0) |
| 62 m_promise.Resolve(m_source.Length); | |
| 63 } catch (Exception e) { | |
| 64 m_promise.Reject(e); | |
| 65 } | |
| 66 } | |
| 67 } | |
| 68 | |
| 69 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | |
| 70 readonly Func<TSrc, TDst> m_transform; | |
| 71 readonly TSrc[] m_source; | |
| 72 readonly TDst[] m_dest; | |
| 73 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | |
| 92 | 74 readonly LogicalOperation m_logicalOperation; |
| 15 | 75 |
| 76 int m_pending; | |
| 77 int m_next; | |
| 78 | |
| 79 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | |
| 80 : base(threads) { | |
| 81 | |
| 82 Debug.Assert (source != null); | |
| 83 Debug.Assert( transform != null); | |
| 84 | |
| 85 m_next = 0; | |
| 86 m_source = source; | |
| 87 m_dest = new TDst[source.Length]; | |
| 88 m_pending = source.Length; | |
| 89 m_transform = transform; | |
| 92 | 90 m_logicalOperation = TraceContext.Instance.CurrentOperation; |
| 15 | 91 |
| 76 | 92 m_promise.Anyway(Dispose); |
| 15 | 93 |
| 94 InitPool(); | |
| 95 } | |
| 96 | |
| 97 public Promise<TDst[]> Promise { | |
| 98 get { | |
| 99 return m_promise; | |
| 100 } | |
| 101 } | |
| 102 | |
| 41 | 103 protected override void Worker() { |
| 92 | 104 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); |
| 105 try { | |
| 106 base.Worker(); | |
| 107 } finally { | |
| 108 TraceContext.Instance.Leave(); | |
| 109 } | |
| 41 | 110 } |
| 111 | |
| 15 | 112 protected override bool TryDequeue(out int unit) { |
| 16 | 113 unit = Interlocked.Increment(ref m_next) - 1; |
| 92 | 114 return unit < m_source.Length; |
| 15 | 115 } |
| 116 | |
| 117 protected override void InvokeUnit(int unit) { | |
| 118 try { | |
| 119 m_dest[unit] = m_transform(m_source[unit]); | |
| 16 | 120 var pending = Interlocked.Decrement(ref m_pending); |
| 15 | 121 if (pending == 0) |
| 122 m_promise.Resolve(m_dest); | |
| 123 } catch (Exception e) { | |
| 124 m_promise.Reject(e); | |
| 125 } | |
| 126 } | |
| 127 } | |
| 128 | |
| 30 | 129 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { |
| 15 | 130 if (source == null) |
| 131 throw new ArgumentNullException("source"); | |
| 132 if (transform == null) | |
| 133 throw new ArgumentNullException("transform"); | |
| 134 | |
| 135 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | |
| 136 return mapper.Promise; | |
| 137 } | |
| 138 | |
| 30 | 139 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { |
| 15 | 140 if (source == null) |
| 141 throw new ArgumentNullException("source"); | |
| 142 if (action == null) | |
| 143 throw new ArgumentNullException("action"); | |
| 144 | |
| 145 var iter = new ArrayIterator<TSrc>(source, action, threads); | |
| 146 return iter.Promise; | |
| 147 } | |
| 16 | 148 |
| 76 | 149 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ResultMapper<TSrc, IPromise<TDst>> transform, int threads) { |
| 16 | 150 if (source == null) |
| 151 throw new ArgumentNullException("source"); | |
| 152 if (transform == null) | |
| 153 throw new ArgumentNullException("transform"); | |
| 154 if (threads <= 0) | |
| 155 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
| 156 | |
| 32 | 157 if (source.Length == 0) |
| 158 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | |
| 159 | |
| 16 | 160 var promise = new Promise<TDst[]>(); |
| 161 var res = new TDst[source.Length]; | |
| 162 var pending = source.Length; | |
| 30 | 163 |
| 80 | 164 object locker = new object(); |
| 165 int slots = threads; | |
| 16 | 166 |
| 75 | 167 // Analysis disable AccessToDisposedClosure |
| 16 | 168 AsyncPool.InvokeNewThread(() => { |
| 169 for (int i = 0; i < source.Length; i++) { | |
|
19
e3935fdf59a2
Promise is rewritten to use interlocked operations instead of locks
cin
parents:
16
diff
changeset
|
170 if(promise.IsResolved) |
| 16 | 171 break; // stop processing in case of error or cancellation |
| 172 var idx = i; | |
| 75 | 173 |
| 89 | 174 if (Interlocked.Decrement(ref slots) < 0) { |
| 175 lock(locker) { | |
| 176 while(slots < 0) | |
| 177 Monitor.Wait(locker); | |
| 178 } | |
| 80 | 179 } |
| 89 | 180 |
| 16 | 181 try { |
| 80 | 182 transform(source[i]) |
| 183 .Anyway(() => { | |
| 89 | 184 Interlocked.Increment(ref slots); |
| 185 lock (locker) { | |
| 80 | 186 Monitor.Pulse(locker); |
| 187 } | |
| 188 }) | |
| 189 .Last( | |
| 190 x => { | |
| 191 res[idx] = x; | |
| 192 var left = Interlocked.Decrement(ref pending); | |
| 193 if (left == 0) | |
| 194 promise.Resolve(res); | |
| 195 }, | |
| 89 | 196 promise.Reject |
| 80 | 197 ); |
| 16 | 198 |
| 199 } catch (Exception e) { | |
| 200 promise.Reject(e); | |
| 201 } | |
| 202 } | |
| 203 return 0; | |
| 204 }); | |
| 205 | |
| 80 | 206 return promise; |
| 16 | 207 } |
| 24 | 208 |
| 15 | 209 } |
| 210 } |
