Mercurial > pub > ImplabNet
annotate Implab/Parallels/ArrayTraits.cs @ 33:b255e4aeef17
removed the reference to the parent from the promise object this allows
resolved promises to release parents and results they are holding.
Added complete set of operations to IPromiseBase interface
Subscribing to the cancellation event of the promise should not affect it's
IsExclusive property
More tests.
author | cin |
---|---|
date | Thu, 10 Apr 2014 02:39:29 +0400 |
parents | 8eca2652d2ff |
children | 2fc0fbe7d58b |
rev | line source |
---|---|
15 | 1 using System; |
2 using System.Collections.Generic; | |
3 using System.Diagnostics; | |
4 using System.Linq; | |
5 using System.Text; | |
6 using System.Threading; | |
7 | |
8 namespace Implab.Parallels { | |
9 public static class ArrayTraits { | |
10 class ArrayIterator<TSrc> : DispatchPool<int> { | |
11 readonly Action<TSrc> m_action; | |
12 readonly TSrc[] m_source; | |
13 readonly Promise<int> m_promise = new Promise<int>(); | |
14 | |
15 int m_pending; | |
16 int m_next; | |
17 | |
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) | |
19 : base(threads) { | |
20 | |
21 Debug.Assert(source != null); | |
22 Debug.Assert(action != null); | |
23 | |
24 m_next = 0; | |
25 m_source = source; | |
26 m_pending = source.Length; | |
27 m_action = action; | |
28 | |
29 m_promise.Anyway(() => Dispose()); | |
30 m_promise.Cancelled(() => Dispose()); | |
31 | |
32 InitPool(); | |
33 } | |
34 | |
35 public Promise<int> Promise { | |
36 get { | |
37 return m_promise; | |
38 } | |
39 } | |
40 | |
41 protected override bool TryDequeue(out int unit) { | |
16 | 42 unit = Interlocked.Increment(ref m_next) - 1; |
43 return unit >= m_source.Length ? false : true; | |
15 | 44 } |
45 | |
46 protected override void InvokeUnit(int unit) { | |
47 try { | |
48 m_action(m_source[unit]); | |
16 | 49 var pending = Interlocked.Decrement(ref m_pending); |
15 | 50 if (pending == 0) |
51 m_promise.Resolve(m_source.Length); | |
52 } catch (Exception e) { | |
53 m_promise.Reject(e); | |
54 } | |
55 } | |
56 } | |
57 | |
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { | |
59 readonly Func<TSrc, TDst> m_transform; | |
60 readonly TSrc[] m_source; | |
61 readonly TDst[] m_dest; | |
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); | |
63 | |
64 int m_pending; | |
65 int m_next; | |
66 | |
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) | |
68 : base(threads) { | |
69 | |
70 Debug.Assert (source != null); | |
71 Debug.Assert( transform != null); | |
72 | |
73 m_next = 0; | |
74 m_source = source; | |
75 m_dest = new TDst[source.Length]; | |
76 m_pending = source.Length; | |
77 m_transform = transform; | |
78 | |
79 m_promise.Anyway(() => Dispose()); | |
80 m_promise.Cancelled(() => Dispose()); | |
81 | |
82 InitPool(); | |
83 } | |
84 | |
85 public Promise<TDst[]> Promise { | |
86 get { | |
87 return m_promise; | |
88 } | |
89 } | |
90 | |
91 protected override bool TryDequeue(out int unit) { | |
16 | 92 unit = Interlocked.Increment(ref m_next) - 1; |
93 return unit >= m_source.Length ? false : true; | |
15 | 94 } |
95 | |
96 protected override void InvokeUnit(int unit) { | |
97 try { | |
98 m_dest[unit] = m_transform(m_source[unit]); | |
16 | 99 var pending = Interlocked.Decrement(ref m_pending); |
15 | 100 if (pending == 0) |
101 m_promise.Resolve(m_dest); | |
102 } catch (Exception e) { | |
103 m_promise.Reject(e); | |
104 } | |
105 } | |
106 } | |
107 | |
30 | 108 public static IPromise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { |
15 | 109 if (source == null) |
110 throw new ArgumentNullException("source"); | |
111 if (transform == null) | |
112 throw new ArgumentNullException("transform"); | |
113 | |
114 var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); | |
115 return mapper.Promise; | |
116 } | |
117 | |
30 | 118 public static IPromise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { |
15 | 119 if (source == null) |
120 throw new ArgumentNullException("source"); | |
121 if (action == null) | |
122 throw new ArgumentNullException("action"); | |
123 | |
124 var iter = new ArrayIterator<TSrc>(source, action, threads); | |
125 return iter.Promise; | |
126 } | |
16 | 127 |
26 | 128 public static IPromise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { |
16 | 129 if (source == null) |
130 throw new ArgumentNullException("source"); | |
131 if (transform == null) | |
132 throw new ArgumentNullException("transform"); | |
133 if (threads <= 0) | |
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
135 | |
32 | 136 if (source.Length == 0) |
137 return Promise<TDst[]>.ResultToPromise(new TDst[0]); | |
138 | |
16 | 139 var promise = new Promise<TDst[]>(); |
140 var res = new TDst[source.Length]; | |
141 var pending = source.Length; | |
30 | 142 |
16 | 143 var semaphore = new Semaphore(threads, threads); |
144 | |
145 AsyncPool.InvokeNewThread(() => { | |
146 for (int i = 0; i < source.Length; i++) { | |
19
e3935fdf59a2
Promise is rewritten to use interlocked operations instead of locks
cin
parents:
16
diff
changeset
|
147 if(promise.IsResolved) |
16 | 148 break; // stop processing in case of error or cancellation |
149 var idx = i; | |
150 semaphore.WaitOne(); | |
151 try { | |
152 var p1 = transform(source[i]); | |
153 p1.Anyway(() => semaphore.Release()); | |
154 p1.Cancelled(() => semaphore.Release()); | |
155 p1.Then( | |
156 x => { | |
157 res[idx] = x; | |
158 var left = Interlocked.Decrement(ref pending); | |
159 if (left == 0) | |
160 promise.Resolve(res); | |
161 }, | |
162 e => promise.Reject(e) | |
163 ); | |
164 | |
165 } catch (Exception e) { | |
166 promise.Reject(e); | |
167 } | |
168 } | |
169 return 0; | |
170 }); | |
171 | |
172 return promise.Anyway(() => semaphore.Dispose()); | |
173 } | |
24 | 174 |
15 | 175 } |
176 } |