comparison Implab/Parallels/ArrayTraits.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents 706fccb85524
children cbe10ac0731e
comparison
equal deleted inserted replaced
71:1714fd8678ef 192:f1da3afc3521
1 using Implab.Diagnostics; 1 using Implab.Diagnostics;
2 using System; 2 using System;
3 using System.Collections.Generic;
4 using System.Diagnostics; 3 using System.Diagnostics;
5 using System.Linq;
6 using System.Text;
7 using System.Threading; 4 using System.Threading;
8 5
9 namespace Implab.Parallels { 6 namespace Implab.Parallels {
10 public static class ArrayTraits { 7 public static class ArrayTraits {
11 class ArrayIterator<TSrc> : DispatchPool<int> { 8 class ArrayIterator<TSrc> : DispatchPool<int> {
12 readonly Action<TSrc> m_action; 9 readonly Action<TSrc> m_action;
13 readonly TSrc[] m_source; 10 readonly TSrc[] m_source;
14 readonly Promise<int> m_promise = new Promise<int>(); 11 readonly Promise<int> m_promise = new Promise<int>();
15 readonly TraceContext m_traceContext; 12 readonly LogicalOperation m_logicalOperation;
16 13
17 int m_pending; 14 int m_pending;
18 int m_next; 15 int m_next;
19 16
20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) 17 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
21 : base(threads) { 18 : base(threads) {
22 19
23 Debug.Assert(source != null); 20 Debug.Assert(source != null);
24 Debug.Assert(action != null); 21 Debug.Assert(action != null);
25 22
26 m_traceContext = TraceContext.Snapshot(); 23 m_logicalOperation = TraceContext.Instance.CurrentOperation;
27 m_next = 0; 24 m_next = 0;
28 m_source = source; 25 m_source = source;
29 m_pending = source.Length; 26 m_pending = source.Length;
30 m_action = action; 27 m_action = action;
31 28
32 m_promise.Anyway(() => Dispose()); 29 m_promise.On(Dispose, PromiseEventType.All);
33 m_promise.Cancelled(() => Dispose());
34 30
35 InitPool(); 31 InitPool();
36 } 32 }
37 33
38 public Promise<int> Promise { 34 public Promise<int> Promise {
40 return m_promise; 36 return m_promise;
41 } 37 }
42 } 38 }
43 39
44 protected override void Worker() { 40 protected override void Worker() {
45 TraceContext.Fork(m_traceContext); 41 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation, false);
46 base.Worker(); 42 try {
43 base.Worker();
44 } finally {
45 TraceContext.Instance.Leave();
46 }
47 } 47 }
48 48
49 protected override bool TryDequeue(out int unit) { 49 protected override bool TryDequeue(out int unit) {
50 unit = Interlocked.Increment(ref m_next) - 1; 50 unit = Interlocked.Increment(ref m_next) - 1;
51 return unit >= m_source.Length ? false : true; 51 return unit < m_source.Length;
52 } 52 }
53 53
54 protected override void InvokeUnit(int unit) { 54 protected override void InvokeUnit(int unit) {
55 try { 55 try {
56 m_action(m_source[unit]); 56 m_action(m_source[unit]);
66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
67 readonly Func<TSrc, TDst> m_transform; 67 readonly Func<TSrc, TDst> m_transform;
68 readonly TSrc[] m_source; 68 readonly TSrc[] m_source;
69 readonly TDst[] m_dest; 69 readonly TDst[] m_dest;
70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 readonly TraceContext m_traceContext; 71 readonly LogicalOperation m_logicalOperation;
72 72
73 int m_pending; 73 int m_pending;
74 int m_next; 74 int m_next;
75 75
76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
82 m_next = 0; 82 m_next = 0;
83 m_source = source; 83 m_source = source;
84 m_dest = new TDst[source.Length]; 84 m_dest = new TDst[source.Length];
85 m_pending = source.Length; 85 m_pending = source.Length;
86 m_transform = transform; 86 m_transform = transform;
87 m_traceContext = TraceContext.Snapshot(); 87 m_logicalOperation = TraceContext.Instance.CurrentOperation;
88 88
89 m_promise.Anyway(() => Dispose()); 89 m_promise.On(Dispose, PromiseEventType.All);
90 m_promise.Cancelled(() => Dispose());
91 90
92 InitPool(); 91 InitPool();
93 } 92 }
94 93
95 public Promise<TDst[]> Promise { 94 public Promise<TDst[]> Promise {
97 return m_promise; 96 return m_promise;
98 } 97 }
99 } 98 }
100 99
101 protected override void Worker() { 100 protected override void Worker() {
102 TraceContext.Fork(m_traceContext); 101 TraceContext.Instance.EnterLogicalOperation(m_logicalOperation,false);
103 base.Worker(); 102 try {
103 base.Worker();
104 } finally {
105 TraceContext.Instance.Leave();
106 }
104 } 107 }
105 108
106 protected override bool TryDequeue(out int unit) { 109 protected override bool TryDequeue(out int unit) {
107 unit = Interlocked.Increment(ref m_next) - 1; 110 unit = Interlocked.Increment(ref m_next) - 1;
108 return unit >= m_source.Length ? false : true; 111 return unit < m_source.Length;
109 } 112 }
110 113
111 protected override void InvokeUnit(int unit) { 114 protected override void InvokeUnit(int unit) {
112 try { 115 try {
113 m_dest[unit] = m_transform(m_source[unit]); 116 m_dest[unit] = m_transform(m_source[unit]);
138 141
139 var iter = new ArrayIterator<TSrc>(source, action, threads); 142 var iter = new ArrayIterator<TSrc>(source, action, threads);
140 return iter.Promise; 143 return iter.Promise;
141 } 144 }
142 145
143 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { 146 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, Func<TSrc, IPromise<TDst>> transform, int threads) {
144 if (source == null) 147 if (source == null)
145 throw new ArgumentNullException("source"); 148 throw new ArgumentNullException("source");
146 if (transform == null) 149 if (transform == null)
147 throw new ArgumentNullException("transform"); 150 throw new ArgumentNullException("transform");
148 if (threads <= 0) 151 if (threads <= 0)
149 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); 152 throw new ArgumentOutOfRangeException("threads","Threads number must be greater then zero");
150 153
151 if (source.Length == 0) 154 if (source.Length == 0)
152 return Promise<TDst[]>.ResultToPromise(new TDst[0]); 155 return Promise<TDst[]>.FromResult(new TDst[0]);
153 156
154 var promise = new Promise<TDst[]>(); 157 var promise = new Promise<TDst[]>();
155 var res = new TDst[source.Length]; 158 var res = new TDst[source.Length];
156 var pending = source.Length; 159 var pending = source.Length;
157 160
158 var semaphore = new Semaphore(threads, threads); 161 object locker = new object();
159 162 int slots = threads;
160 AsyncPool.InvokeNewThread(() => { 163
164 // Analysis disable AccessToDisposedClosure
165 AsyncPool.RunThread<int>(() => {
161 for (int i = 0; i < source.Length; i++) { 166 for (int i = 0; i < source.Length; i++) {
162 if(promise.IsResolved) 167 if(promise.IsResolved)
163 break; // stop processing in case of error or cancellation 168 break; // stop processing in case of error or cancellation
164 var idx = i; 169 var idx = i;
165 semaphore.WaitOne(); 170
171 if (Interlocked.Decrement(ref slots) < 0) {
172 lock(locker) {
173 while(slots < 0)
174 Monitor.Wait(locker);
175 }
176 }
177
166 try { 178 try {
167 var p1 = transform(source[i]); 179 transform(source[i])
168 p1.Anyway(() => semaphore.Release()); 180 .On( x => {
169 p1.Cancelled(() => semaphore.Release()); 181 Interlocked.Increment(ref slots);
170 p1.Then( 182 lock (locker) {
171 x => { 183 Monitor.Pulse(locker);
172 res[idx] = x; 184 }
173 var left = Interlocked.Decrement(ref pending); 185 })
174 if (left == 0) 186 .On(
175 promise.Resolve(res); 187 x => {
176 }, 188 res[idx] = x;
177 e => promise.Reject(e) 189 var left = Interlocked.Decrement(ref pending);
178 ); 190 if (left == 0)
191 promise.Resolve(res);
192 },
193 promise.Reject
194 );
179 195
180 } catch (Exception e) { 196 } catch (Exception e) {
181 promise.Reject(e); 197 promise.Reject(e);
182 } 198 }
183 } 199 }
184 return 0; 200 return 0;
185 }); 201 });
186 202
187 return promise.Anyway(() => semaphore.Dispose()); 203 return promise;
188 } 204 }
189 205
190 } 206 }
191 } 207 }