comparison Implab/Parallels/ArrayTraits.cs @ 16:5a4b735ba669 promises

sync
author cin
date Thu, 07 Nov 2013 20:20:26 +0400
parents 0f982f9b7d4d
children e3935fdf59a2
comparison
equal deleted inserted replaced
15:0f982f9b7d4d 16:5a4b735ba669
37 return m_promise; 37 return m_promise;
38 } 38 }
39 } 39 }
40 40
41 protected override bool TryDequeue(out int unit) { 41 protected override bool TryDequeue(out int unit) {
42 int index; 42 unit = Interlocked.Increment(ref m_next) - 1;
43 unit = -1; 43 return unit >= m_source.Length ? false : true;
44 do {
45 index = m_next;
46 if (index >= m_source.Length)
47 return false;
48 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
49
50 unit = index;
51 return true;
52 } 44 }
53 45
54 protected override void InvokeUnit(int unit) { 46 protected override void InvokeUnit(int unit) {
55 try { 47 try {
56 m_action(m_source[unit]); 48 m_action(m_source[unit]);
57 int pending; 49 var pending = Interlocked.Decrement(ref m_pending);
58 do {
59 pending = m_pending;
60 } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending));
61 pending--;
62 if (pending == 0) 50 if (pending == 0)
63 m_promise.Resolve(m_source.Length); 51 m_promise.Resolve(m_source.Length);
64 } catch (Exception e) { 52 } catch (Exception e) {
65 m_promise.Reject(e); 53 m_promise.Reject(e);
66 } 54 }
99 return m_promise; 87 return m_promise;
100 } 88 }
101 } 89 }
102 90
103 protected override bool TryDequeue(out int unit) { 91 protected override bool TryDequeue(out int unit) {
104 int index; 92 unit = Interlocked.Increment(ref m_next) - 1;
105 unit = -1; 93 return unit >= m_source.Length ? false : true;
106 do {
107 index = m_next;
108 if (index >= m_source.Length)
109 return false;
110 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index));
111
112 unit = index;
113 return true;
114 } 94 }
115 95
116 protected override void InvokeUnit(int unit) { 96 protected override void InvokeUnit(int unit) {
117 try { 97 try {
118 m_dest[unit] = m_transform(m_source[unit]); 98 m_dest[unit] = m_transform(m_source[unit]);
119 int pending; 99 var pending = Interlocked.Decrement(ref m_pending);
120 do {
121 pending = m_pending;
122 } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending));
123 pending --;
124 if (pending == 0) 100 if (pending == 0)
125 m_promise.Resolve(m_dest); 101 m_promise.Resolve(m_dest);
126 } catch (Exception e) { 102 } catch (Exception e) {
127 m_promise.Reject(e); 103 m_promise.Reject(e);
128 } 104 }
146 throw new ArgumentNullException("action"); 122 throw new ArgumentNullException("action");
147 123
148 var iter = new ArrayIterator<TSrc>(source, action, threads); 124 var iter = new ArrayIterator<TSrc>(source, action, threads);
149 return iter.Promise; 125 return iter.Promise;
150 } 126 }
127
128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
129 if (source == null)
130 throw new ArgumentNullException("source");
131 if (transform == null)
132 throw new ArgumentNullException("transform");
133 if (threads <= 0)
134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
135
136 var promise = new Promise<TDst[]>();
137 var res = new TDst[source.Length];
138 var pending = source.Length;
139 var semaphore = new Semaphore(threads, threads);
140
141 AsyncPool.InvokeNewThread(() => {
142 for (int i = 0; i < source.Length; i++) {
143 if(promise.State != PromiseState.Unresolved)
144 break; // stop processing in case of error or cancellation
145 var idx = i;
146 semaphore.WaitOne();
147 try {
148 var p1 = transform(source[i]);
149 p1.Anyway(() => semaphore.Release());
150 p1.Cancelled(() => semaphore.Release());
151 p1.Then(
152 x => {
153 res[idx] = x;
154 var left = Interlocked.Decrement(ref pending);
155 if (left == 0)
156 promise.Resolve(res);
157 },
158 e => promise.Reject(e)
159 );
160
161 } catch (Exception e) {
162 promise.Reject(e);
163 }
164 }
165 return 0;
166 });
167
168 return promise.Anyway(() => semaphore.Dispose());
169 }
151 } 170 }
152 } 171 }