Mercurial > pub > ImplabNet
comparison Implab/Parallels/ArrayTraits.cs @ 16:5a4b735ba669 promises
sync
author | cin |
---|---|
date | Thu, 07 Nov 2013 20:20:26 +0400 |
parents | 0f982f9b7d4d |
children | e3935fdf59a2 |
comparison
equal
deleted
inserted
replaced
15:0f982f9b7d4d | 16:5a4b735ba669 |
---|---|
37 return m_promise; | 37 return m_promise; |
38 } | 38 } |
39 } | 39 } |
40 | 40 |
41 protected override bool TryDequeue(out int unit) { | 41 protected override bool TryDequeue(out int unit) { |
42 int index; | 42 unit = Interlocked.Increment(ref m_next) - 1; |
43 unit = -1; | 43 return unit >= m_source.Length ? false : true; |
44 do { | |
45 index = m_next; | |
46 if (index >= m_source.Length) | |
47 return false; | |
48 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | |
49 | |
50 unit = index; | |
51 return true; | |
52 } | 44 } |
53 | 45 |
54 protected override void InvokeUnit(int unit) { | 46 protected override void InvokeUnit(int unit) { |
55 try { | 47 try { |
56 m_action(m_source[unit]); | 48 m_action(m_source[unit]); |
57 int pending; | 49 var pending = Interlocked.Decrement(ref m_pending); |
58 do { | |
59 pending = m_pending; | |
60 } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); | |
61 pending--; | |
62 if (pending == 0) | 50 if (pending == 0) |
63 m_promise.Resolve(m_source.Length); | 51 m_promise.Resolve(m_source.Length); |
64 } catch (Exception e) { | 52 } catch (Exception e) { |
65 m_promise.Reject(e); | 53 m_promise.Reject(e); |
66 } | 54 } |
99 return m_promise; | 87 return m_promise; |
100 } | 88 } |
101 } | 89 } |
102 | 90 |
103 protected override bool TryDequeue(out int unit) { | 91 protected override bool TryDequeue(out int unit) { |
104 int index; | 92 unit = Interlocked.Increment(ref m_next) - 1; |
105 unit = -1; | 93 return unit >= m_source.Length ? false : true; |
106 do { | |
107 index = m_next; | |
108 if (index >= m_source.Length) | |
109 return false; | |
110 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | |
111 | |
112 unit = index; | |
113 return true; | |
114 } | 94 } |
115 | 95 |
116 protected override void InvokeUnit(int unit) { | 96 protected override void InvokeUnit(int unit) { |
117 try { | 97 try { |
118 m_dest[unit] = m_transform(m_source[unit]); | 98 m_dest[unit] = m_transform(m_source[unit]); |
119 int pending; | 99 var pending = Interlocked.Decrement(ref m_pending); |
120 do { | |
121 pending = m_pending; | |
122 } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); | |
123 pending --; | |
124 if (pending == 0) | 100 if (pending == 0) |
125 m_promise.Resolve(m_dest); | 101 m_promise.Resolve(m_dest); |
126 } catch (Exception e) { | 102 } catch (Exception e) { |
127 m_promise.Reject(e); | 103 m_promise.Reject(e); |
128 } | 104 } |
146 throw new ArgumentNullException("action"); | 122 throw new ArgumentNullException("action"); |
147 | 123 |
148 var iter = new ArrayIterator<TSrc>(source, action, threads); | 124 var iter = new ArrayIterator<TSrc>(source, action, threads); |
149 return iter.Promise; | 125 return iter.Promise; |
150 } | 126 } |
127 | |
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | |
129 if (source == null) | |
130 throw new ArgumentNullException("source"); | |
131 if (transform == null) | |
132 throw new ArgumentNullException("transform"); | |
133 if (threads <= 0) | |
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
135 | |
136 var promise = new Promise<TDst[]>(); | |
137 var res = new TDst[source.Length]; | |
138 var pending = source.Length; | |
139 var semaphore = new Semaphore(threads, threads); | |
140 | |
141 AsyncPool.InvokeNewThread(() => { | |
142 for (int i = 0; i < source.Length; i++) { | |
143 if(promise.State != PromiseState.Unresolved) | |
144 break; // stop processing in case of error or cancellation | |
145 var idx = i; | |
146 semaphore.WaitOne(); | |
147 try { | |
148 var p1 = transform(source[i]); | |
149 p1.Anyway(() => semaphore.Release()); | |
150 p1.Cancelled(() => semaphore.Release()); | |
151 p1.Then( | |
152 x => { | |
153 res[idx] = x; | |
154 var left = Interlocked.Decrement(ref pending); | |
155 if (left == 0) | |
156 promise.Resolve(res); | |
157 }, | |
158 e => promise.Reject(e) | |
159 ); | |
160 | |
161 } catch (Exception e) { | |
162 promise.Reject(e); | |
163 } | |
164 } | |
165 return 0; | |
166 }); | |
167 | |
168 return promise.Anyway(() => semaphore.Dispose()); | |
169 } | |
151 } | 170 } |
152 } | 171 } |