Mercurial > pub > ImplabNet
annotate Implab/Parallels/ArrayTraits.cs @ 44:e5ec543feee3
Рефакторинг, журналирование
| author | cin |
|---|---|
| date | Wed, 16 Apr 2014 19:02:58 +0400 |
| parents | 2fc0fbe7d58b |
| children | d9d794b61bb9 |
| rev | line source |
|---|---|
| 41 | 1 using Implab.Diagnostics; |
| 2 using System; | |
| 15 | 3 using System.Collections.Generic; |
| 4 using System.Diagnostics; | |
| 5 using System.Linq; | |
| 6 using System.Text; | |
| 7 using System.Threading; | |
| 8 | |
| 9 namespace Implab.Parallels { | |
| 10 public static class ArrayTraits { | |
| 11 class ArrayIterator<TSrc> : DispatchPool<int> { | |
| 12 readonly Action<TSrc> m_action; | |
| 13 readonly TSrc[] m_source; | |
| 14 readonly Promise<int> m_promise = new Promise<int>(); | |
| 41 | 15 readonly TraceContext m_traceContext; |
| 15 | 16 |
| 17 int m_pending; | |
| 18 int m_next; | |
| 19 | |
| 20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | |
| 21 : base(threads) { | |
| 22 | |
| 23 Debug.Assert(source != null); | |
| 24 Debug.Assert(action != null); | |
| 25 | |
| 41 | 26 m_traceContext = TraceContext.Snapshot(); |
| 15 | 27 m_next = 0; |
| 28 m_source = source; | |
| 29 m_pending = source.Length; | |
| 30 m_action = action; | |
| 31 | |
| 32 m_promise.Anyway(() => Dispose()); | |
| 33 m_promise.Cancelled(() => Dispose()); | |
| 34 | |
| 35 InitPool(); | |
| 36 } | |
| 37 | |
| 38 public Promise<int> Promise { | |
| 39 get { | |
| 40 return m_promise; | |
| 41 } | |
| 42 } | |
| 43 | |
| 41 | 44 protected override void Worker() { |
| 45 TraceContext.Transfer(m_traceContext); | |
| 46 base.Worker(); | |
| 47 } | |
| 48 | |
| 15 | 49 protected override bool TryDequeue(out int unit) { |
| 16 | 50 unit = Interlocked.Increment(ref m_next) - 1; |
| 51 return unit >= m_source.Length ? false : true; | |
| 15 | 52 } |
| 53 | |
| 54 protected override void InvokeUnit(int unit) { | |
| 55 try { | |
| 56 m_action(m_source[unit]); | |
| 16 | 57 var pending = Interlocked.Decrement(ref m_pending); |
| 15 | 58 if (pending == 0) |
| 59 m_promise.Resolve(m_source.Length); | |
| 60 } catch (Exception e) { | |
| 61 m_promise.Reject(e); | |
| 62 } | |
| 63 } | |
| 64 } | |
| 65 | |
| 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | |
| 67 readonly Func<TSrc, TDst> m_transform; | |
| 68 readonly TSrc[] m_source; | |
| 69 readonly TDst[] m_dest; | |
| 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | |
| 41 | 71 readonly TraceContext m_traceContext; |
| 15 | 72 |
| 73 int m_pending; | |
| 74 int m_next; | |
| 75 | |
| 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | |
| 77 : base(threads) { | |
| 78 | |
| 79 Debug.Assert (source != null); | |
| 80 Debug.Assert( transform != null); | |
| 81 | |
| 82 m_next = 0; | |
| 83 m_source = source; | |
| 84 m_dest = new TDst[source.Length]; | |
| 85 m_pending = source.Length; | |
| 86 m_transform = transform; | |
| 41 | 87 m_traceContext = TraceContext.Snapshot(); |
| 15 | 88 |
| 89 m_promise.Anyway(() => Dispose()); | |
| 90 m_promise.Cancelled(() => Dispose()); | |
| 91 | |
| 92 InitPool(); | |
| 93 } | |
| 94 | |
| 95 public Promise<TDst[]> Promise { | |
| 96 get { | |
| 97 return m_promise; | |
| 98 } | |
| 99 } | |
| 100 | |
| 41 | 101 protected override void Worker() { |
| 102 TraceContext.Transfer(m_traceContext); | |
| 103 base.Worker(); | |
| 104 } | |
| 105 | |
| 15 | 106 protected override bool TryDequeue(out int unit) { |
| 16 | 107 unit = Interlocked.Increment(ref m_next) - 1; |
| 108 return unit >= m_source.Length ? false : true; | |
| 15 | 109 } |
| 110 | |
| 111 protected override void InvokeUnit(int unit) { | |
| 112 try { | |
| 113 m_dest[unit] = m_transform(m_source[unit]); | |
| 16 | 114 var pending = Interlocked.Decrement(ref m_pending); |
| 15 | 115 if (pending == 0) |
| 116 m_promise.Resolve(m_dest); | |
| 117 } catch (Exception e) { | |
| 118 m_promise.Reject(e); | |
| 119 } | |
| 120 } | |
| 121 } | |
| 122 | |
| 30 | 123 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { |
| 15 | 124 if (source == null) |
| 125 throw new ArgumentNullException("source"); | |
| 126 if (transform == null) | |
| 127 throw new ArgumentNullException("transform"); | |
| 128 | |
| 129 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | |
| 130 return mapper.Promise; | |
| 131 } | |
| 132 | |
| 30 | 133 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { |
| 15 | 134 if (source == null) |
| 135 throw new ArgumentNullException("source"); | |
| 136 if (action == null) | |
| 137 throw new ArgumentNullException("action"); | |
| 138 | |
| 139 var iter = new ArrayIterator<TSrc>(source, action, threads); | |
| 140 return iter.Promise; | |
| 141 } | |
| 16 | 142 |
| 26 | 143 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { |
| 16 | 144 if (source == null) |
| 145 throw new ArgumentNullException("source"); | |
| 146 if (transform == null) | |
| 147 throw new ArgumentNullException("transform"); | |
| 148 if (threads <= 0) | |
| 149 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
| 150 | |
| 32 | 151 if (source.Length == 0) |
| 152 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | |
| 153 | |
| 16 | 154 var promise = new Promise<TDst[]>(); |
| 155 var res = new TDst[source.Length]; | |
| 156 var pending = source.Length; | |
| 30 | 157 |
| 16 | 158 var semaphore = new Semaphore(threads, threads); |
| 159 | |
| 160 AsyncPool.InvokeNewThread(() => { | |
| 161 for (int i = 0; i < source.Length; i++) { | |
|
19
e3935fdf59a2
Promise is rewritten to use interlocked operations instead of locks
cin
parents:
16
diff
changeset
|
162 if(promise.IsResolved) |
| 16 | 163 break; // stop processing in case of error or cancellation |
| 164 var idx = i; | |
| 165 semaphore.WaitOne(); | |
| 166 try { | |
| 167 var p1 = transform(source[i]); | |
| 168 p1.Anyway(() => semaphore.Release()); | |
| 169 p1.Cancelled(() => semaphore.Release()); | |
| 170 p1.Then( | |
| 171 x => { | |
| 172 res[idx] = x; | |
| 173 var left = Interlocked.Decrement(ref pending); | |
| 174 if (left == 0) | |
| 175 promise.Resolve(res); | |
| 176 }, | |
| 177 e => promise.Reject(e) | |
| 178 ); | |
| 179 | |
| 180 } catch (Exception e) { | |
| 181 promise.Reject(e); | |
| 182 } | |
| 183 } | |
| 184 return 0; | |
| 185 }); | |
| 186 | |
| 187 return promise.Anyway(() => semaphore.Dispose()); | |
| 188 } | |
| 24 | 189 |
| 15 | 190 } |
| 191 } |
