Mercurial > pub > ImplabNet
annotate Implab/Parallels/ArrayTraits.cs @ 140:f973c5df9972 v2
fixes
| author | cin |
|---|---|
| date | Fri, 20 Feb 2015 15:58:34 +0300 |
| parents | a336cb13c6a9 |
| children | 706fccb85524 |
| rev | line source |
|---|---|
| 41 | 1 using Implab.Diagnostics; |
| 2 using System; | |
| 15 | 3 using System.Diagnostics; |
| 4 using System.Threading; | |
| 5 | |
| 6 namespace Implab.Parallels { | |
| 7 public static class ArrayTraits { | |
| 8 class ArrayIterator<TSrc> : DispatchPool<int> { | |
| 9 readonly Action<TSrc> m_action; | |
| 10 readonly TSrc[] m_source; | |
| 11 readonly Promise<int> m_promise = new Promise<int>(); | |
| 92 | 12 readonly LogicalOperation m_logicalOperation; |
| 15 | 13 |
| 14 int m_pending; | |
| 15 int m_next; | |
| 16 | |
| 17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | |
| 18 : base(threads) { | |
| 19 | |
| 20 Debug.Assert(source != null); | |
| 21 Debug.Assert(action != null); | |
| 22 | |
| 92 | 23 m_logicalOperation = TraceContext.Instance.CurrentOperation; |
| 15 | 24 m_next = 0; |
| 25 m_source = source; | |
| 26 m_pending = source.Length; | |
| 27 m_action = action; | |
| 28 | |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
104
diff
changeset
|
29 m_promise.On(Dispose, PromiseEventType.All); |
| 15 | 30 |
| 31 InitPool(); | |
| 32 } | |
| 33 | |
| 34 public Promise<int> Promise { | |
| 35 get { | |
| 36 return m_promise; | |
| 37 } | |
| 38 } | |
| 39 | |
| 41 | 40 protected override void Worker() { |
| 92 | 41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); |
| 42 try { | |
| 43 base.Worker(); | |
| 44 } finally { | |
| 45 TraceContext.Instance.Leave(); | |
| 46 } | |
| 41 | 47 } |
| 48 | |
| 15 | 49 protected override bool TryDequeue(out int unit) { |
| 16 | 50 unit = Interlocked.Increment(ref m_next) - 1; |
| 75 | 51 return unit < m_source.Length; |
| 15 | 52 } |
| 53 | |
| 54 protected override void InvokeUnit(int unit) { | |
| 55 try { | |
| 56 m_action(m_source[unit]); | |
| 16 | 57 var pending = Interlocked.Decrement(ref m_pending); |
| 15 | 58 if (pending == 0) |
| 59 m_promise.Resolve(m_source.Length); | |
| 60 } catch (Exception e) { | |
| 61 m_promise.Reject(e); | |
| 62 } | |
| 63 } | |
| 64 } | |
| 65 | |
| 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | |
| 67 readonly Func<TSrc, TDst> m_transform; | |
| 68 readonly TSrc[] m_source; | |
| 69 readonly TDst[] m_dest; | |
| 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | |
| 92 | 71 readonly LogicalOperation m_logicalOperation; |
| 15 | 72 |
| 73 int m_pending; | |
| 74 int m_next; | |
| 75 | |
| 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | |
| 77 : base(threads) { | |
| 78 | |
| 79 Debug.Assert (source != null); | |
| 80 Debug.Assert( transform != null); | |
| 81 | |
| 82 m_next = 0; | |
| 83 m_source = source; | |
| 84 m_dest = new TDst[source.Length]; | |
| 85 m_pending = source.Length; | |
| 86 m_transform = transform; | |
| 92 | 87 m_logicalOperation = TraceContext.Instance.CurrentOperation; |
| 15 | 88 |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
104
diff
changeset
|
89 m_promise.On(Dispose, PromiseEventType.All); |
| 15 | 90 |
| 91 InitPool(); | |
| 92 } | |
| 93 | |
| 94 public Promise<TDst[]> Promise { | |
| 95 get { | |
| 96 return m_promise; | |
| 97 } | |
| 98 } | |
| 99 | |
| 41 | 100 protected override void Worker() { |
| 92 | 101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); |
| 102 try { | |
| 103 base.Worker(); | |
| 104 } finally { | |
| 105 TraceContext.Instance.Leave(); | |
| 106 } | |
| 41 | 107 } |
| 108 | |
| 15 | 109 protected override bool TryDequeue(out int unit) { |
| 16 | 110 unit = Interlocked.Increment(ref m_next) - 1; |
| 92 | 111 return unit < m_source.Length; |
| 15 | 112 } |
| 113 | |
| 114 protected override void InvokeUnit(int unit) { | |
| 115 try { | |
| 116 m_dest[unit] = m_transform(m_source[unit]); | |
| 16 | 117 var pending = Interlocked.Decrement(ref m_pending); |
| 15 | 118 if (pending == 0) |
| 119 m_promise.Resolve(m_dest); | |
| 120 } catch (Exception e) { | |
| 121 m_promise.Reject(e); | |
| 122 } | |
| 123 } | |
| 124 } | |
| 125 | |
| 30 | 126 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { |
| 15 | 127 if (source == null) |
| 128 throw new ArgumentNullException("source"); | |
| 129 if (transform == null) | |
| 130 throw new ArgumentNullException("transform"); | |
| 131 | |
| 132 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | |
| 133 return mapper.Promise; | |
| 134 } | |
| 135 | |
| 30 | 136 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { |
| 15 | 137 if (source == null) |
| 138 throw new ArgumentNullException("source"); | |
| 139 if (action == null) | |
| 140 throw new ArgumentNullException("action"); | |
| 141 | |
| 142 var iter = new ArrayIterator<TSrc>(source, action, threads); | |
| 143 return iter.Promise; | |
| 144 } | |
| 16 | 145 |
| 101 | 146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) { |
| 16 | 147 if (source == null) |
| 148 throw new ArgumentNullException("source"); | |
| 149 if (transform == null) | |
| 150 throw new ArgumentNullException("transform"); | |
| 151 if (threads <= 0) | |
| 101 | 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero"); |
| 16 | 153 |
| 32 | 154 if (source.Length == 0) |
| 155 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | |
| 156 | |
| 16 | 157 var promise = new Promise<TDst[]>(); |
| 158 var res = new TDst[source.Length]; | |
| 159 var pending = source.Length; | |
| 30 | 160 |
| 80 | 161 object locker = new object(); |
| 162 int slots = threads; | |
| 16 | 163 |
| 75 | 164 // Analysis disable AccessToDisposedClosure |
| 124 | 165 AsyncPool.RunThread<int>(() => { |
| 16 | 166 for (int i = 0; i < source.Length; i++) { |
|
19
e3935fdf59a2
Promise is rewritten to use interlocked operations instead of locks
cin
parents:
16
diff
changeset
|
167 if(promise.IsResolved) |
| 16 | 168 break; // stop processing in case of error or cancellation |
| 169 var idx = i; | |
| 75 | 170 |
| 89 | 171 if (Interlocked.Decrement(ref slots) < 0) { |
| 172 lock(locker) { | |
| 173 while(slots < 0) | |
| 174 Monitor.Wait(locker); | |
| 175 } | |
| 80 | 176 } |
| 89 | 177 |
| 16 | 178 try { |
| 80 | 179 transform(source[i]) |
|
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
104
diff
changeset
|
180 .On( x => { |
| 89 | 181 Interlocked.Increment(ref slots); |
| 182 lock (locker) { | |
| 80 | 183 Monitor.Pulse(locker); |
| 184 } | |
| 185 }) | |
| 104 | 186 .On( |
| 80 | 187 x => { |
| 188 res[idx] = x; | |
| 189 var left = Interlocked.Decrement(ref pending); | |
| 190 if (left == 0) | |
| 191 promise.Resolve(res); | |
| 192 }, | |
| 89 | 193 promise.Reject |
| 80 | 194 ); |
| 16 | 195 |
| 196 } catch (Exception e) { | |
| 197 promise.Reject(e); | |
| 198 } | |
| 199 } | |
| 200 return 0; | |
| 201 }); | |
| 202 | |
| 80 | 203 return promise; |
| 16 | 204 } |
| 24 | 205 |
| 15 | 206 } |
| 207 } |
