Mercurial > pub > ImplabNet
comparison Implab/Parallels/ArrayTraits.cs @ 192:f1da3afc3521 release v2.1
Слияние с v2
author | cin |
---|---|
date | Fri, 22 Apr 2016 13:10:34 +0300 |
parents | 706fccb85524 |
children | cbe10ac0731e |
comparison
equal
deleted
inserted
replaced
71:1714fd8678ef | 192:f1da3afc3521 |
---|---|
1 using Implab.Diagnostics; | 1 using Implab.Diagnostics; |
2 using System; | 2 using System; |
3 using System.Collections.Generic; | |
4 using System.Diagnostics; | 3 using System.Diagnostics; |
5 using System.Linq; | |
6 using System.Text; | |
7 using System.Threading; | 4 using System.Threading; |
8 | 5 |
9 namespace Implab.Parallels { | 6 namespace Implab.Parallels { |
10 public static class ArrayTraits { | 7 public static class ArrayTraits { |
11 class ArrayIterator<TSrc> : DispatchPool<int> { | 8 class ArrayIterator<TSrc> : DispatchPool<int> { |
12 readonly Action<TSrc> m_action; | 9 readonly Action<TSrc> m_action; |
13 readonly TSrc[] m_source; | 10 readonly TSrc[] m_source; |
14 readonly Promise<int> m_promise = new Promise<int>(); | 11 readonly Promise<int> m_promise = new Promise<int>(); |
15 readonly TraceContext m_traceContext; | 12 readonly LogicalOperation m_logicalOperation; |
16 | 13 |
17 int m_pending; | 14 int m_pending; |
18 int m_next; | 15 int m_next; |
19 | 16 |
20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | 17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) |
21 : base(threads) { | 18 : base(threads) { |
22 | 19 |
23 Debug.Assert(source != null); | 20 Debug.Assert(source != null); |
24 Debug.Assert(action != null); | 21 Debug.Assert(action != null); |
25 | 22 |
26 m_traceContext = TraceContext.Snapshot(); | 23 m_logicalOperation = TraceContext.Instance.CurrentOperation; |
27 m_next = 0; | 24 m_next = 0; |
28 m_source = source; | 25 m_source = source; |
29 m_pending = source.Length; | 26 m_pending = source.Length; |
30 m_action = action; | 27 m_action = action; |
31 | 28 |
32 m_promise.Anyway(() => Dispose()); | 29 m_promise.On(Dispose, PromiseEventType.All); |
33 m_promise.Cancelled(() => Dispose()); | |
34 | 30 |
35 InitPool(); | 31 InitPool(); |
36 } | 32 } |
37 | 33 |
38 public Promise<int> Promise { | 34 public Promise<int> Promise { |
40 return m_promise; | 36 return m_promise; |
41 } | 37 } |
42 } | 38 } |
43 | 39 |
44 protected override void Worker() { | 40 protected override void Worker() { |
45 TraceContext.Fork(m_traceContext); | 41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false); |
46 base.Worker(); | 42 try { |
43 base.Worker(); | |
44 } finally { | |
45 TraceContext.Instance.Leave(); | |
46 } | |
47 } | 47 } |
48 | 48 |
49 protected override bool TryDequeue(out int unit) { | 49 protected override bool TryDequeue(out int unit) { |
50 unit = Interlocked.Increment(ref m_next) - 1; | 50 unit = Interlocked.Increment(ref m_next) - 1; |
51 return unit >= m_source.Length ? false : true; | 51 return unit < m_source.Length; |
52 } | 52 } |
53 | 53 |
54 protected override void InvokeUnit(int unit) { | 54 protected override void InvokeUnit(int unit) { |
55 try { | 55 try { |
56 m_action(m_source[unit]); | 56 m_action(m_source[unit]); |
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { |
67 readonly Func<TSrc, TDst> m_transform; | 67 readonly Func<TSrc, TDst> m_transform; |
68 readonly TSrc[] m_source; | 68 readonly TSrc[] m_source; |
69 readonly TDst[] m_dest; | 69 readonly TDst[] m_dest; |
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); |
71 readonly TraceContext m_traceContext; | 71 readonly LogicalOperation m_logicalOperation; |
72 | 72 |
73 int m_pending; | 73 int m_pending; |
74 int m_next; | 74 int m_next; |
75 | 75 |
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) |
82 m_next = 0; | 82 m_next = 0; |
83 m_source = source; | 83 m_source = source; |
84 m_dest = new TDst[source.Length]; | 84 m_dest = new TDst[source.Length]; |
85 m_pending = source.Length; | 85 m_pending = source.Length; |
86 m_transform = transform; | 86 m_transform = transform; |
87 m_traceContext = TraceContext.Snapshot(); | 87 m_logicalOperation = TraceContext.Instance.CurrentOperation; |
88 | 88 |
89 m_promise.Anyway(() => Dispose()); | 89 m_promise.On(Dispose, PromiseEventType.All); |
90 m_promise.Cancelled(() => Dispose()); | |
91 | 90 |
92 InitPool(); | 91 InitPool(); |
93 } | 92 } |
94 | 93 |
95 public Promise<TDst[]> Promise { | 94 public Promise<TDst[]> Promise { |
97 return m_promise; | 96 return m_promise; |
98 } | 97 } |
99 } | 98 } |
100 | 99 |
101 protected override void Worker() { | 100 protected override void Worker() { |
102 TraceContext.Fork(m_traceContext); | 101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false); |
103 base.Worker(); | 102 try { |
103 base.Worker(); | |
104 } finally { | |
105 TraceContext.Instance.Leave(); | |
106 } | |
104 } | 107 } |
105 | 108 |
106 protected override bool TryDequeue(out int unit) { | 109 protected override bool TryDequeue(out int unit) { |
107 unit = Interlocked.Increment(ref m_next) - 1; | 110 unit = Interlocked.Increment(ref m_next) - 1; |
108 return unit >= m_source.Length ? false : true; | 111 return unit < m_source.Length; |
109 } | 112 } |
110 | 113 |
111 protected override void InvokeUnit(int unit) { | 114 protected override void InvokeUnit(int unit) { |
112 try { | 115 try { |
113 m_dest[unit] = m_transform(m_source[unit]); | 116 m_dest[unit] = m_transform(m_source[unit]); |
138 | 141 |
139 var iter = new ArrayIterator<TSrc>(source, action, threads); | 142 var iter = new ArrayIterator<TSrc>(source, action, threads); |
140 return iter.Promise; | 143 return iter.Promise; |
141 } | 144 } |
142 | 145 |
143 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | 146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) { |
144 if (source == null) | 147 if (source == null) |
145 throw new ArgumentNullException("source"); | 148 throw new ArgumentNullException("source"); |
146 if (transform == null) | 149 if (transform == null) |
147 throw new ArgumentNullException("transform"); | 150 throw new ArgumentNullException("transform"); |
148 if (threads <= 0) | 151 if (threads <= 0) |
149 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero"); |
150 | 153 |
151 if (source.Length == 0) | 154 if (source.Length == 0) |
152 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | 155 return Promise<TDst[]>.FromResult(new TDst[0]); |
153 | 156 |
154 var promise = new Promise<TDst[]>(); | 157 var promise = new Promise<TDst[]>(); |
155 var res = new TDst[source.Length]; | 158 var res = new TDst[source.Length]; |
156 var pending = source.Length; | 159 var pending = source.Length; |
157 | 160 |
158 var semaphore = new Semaphore(threads, threads); | 161 object locker = new object(); |
159 | 162 int slots = threads; |
160 AsyncPool.InvokeNewThread(() => { | 163 |
164 // Analysis disable AccessToDisposedClosure | |
165 AsyncPool.RunThread<int>(() => { | |
161 for (int i = 0; i < source.Length; i++) { | 166 for (int i = 0; i < source.Length; i++) { |
162 if(promise.IsResolved) | 167 if(promise.IsResolved) |
163 break; // stop processing in case of error or cancellation | 168 break; // stop processing in case of error or cancellation |
164 var idx = i; | 169 var idx = i; |
165 semaphore.WaitOne(); | 170 |
171 if (Interlocked.Decrement(ref slots) < 0) { | |
172 lock(locker) { | |
173 while(slots < 0) | |
174 Monitor.Wait(locker); | |
175 } | |
176 } | |
177 | |
166 try { | 178 try { |
167 var p1 = transform(source[i]); | 179 transform(source[i]) |
168 p1.Anyway(() => semaphore.Release()); | 180 .On( x => { |
169 p1.Cancelled(() => semaphore.Release()); | 181 Interlocked.Increment(ref slots); |
170 p1.Then( | 182 lock (locker) { |
171 x => { | 183 Monitor.Pulse(locker); |
172 res[idx] = x; | 184 } |
173 var left = Interlocked.Decrement(ref pending); | 185 }) |
174 if (left == 0) | 186 .On( |
175 promise.Resolve(res); | 187 x => { |
176 }, | 188 res[idx] = x; |
177 e => promise.Reject(e) | 189 var left = Interlocked.Decrement(ref pending); |
178 ); | 190 if (left == 0) |
191 promise.Resolve(res); | |
192 }, | |
193 promise.Reject | |
194 ); | |
179 | 195 |
180 } catch (Exception e) { | 196 } catch (Exception e) { |
181 promise.Reject(e); | 197 promise.Reject(e); |
182 } | 198 } |
183 } | 199 } |
184 return 0; | 200 return 0; |
185 }); | 201 }); |
186 | 202 |
187 return promise.Anyway(() => semaphore.Dispose()); | 203 return promise; |
188 } | 204 } |
189 | 205 |
190 } | 206 } |
191 } | 207 } |