Mercurial > pub > ImplabNet
comparison Implab/Parallels/ArrayTraits.cs @ 16:5a4b735ba669 promises
sync
| author | cin | 
|---|---|
| date | Thu, 07 Nov 2013 20:20:26 +0400 | 
| parents | 0f982f9b7d4d | 
| children | e3935fdf59a2 | 
   comparison
  equal
  deleted
  inserted
  replaced
| 15:0f982f9b7d4d | 16:5a4b735ba669 | 
|---|---|
| 37 return m_promise; | 37 return m_promise; | 
| 38 } | 38 } | 
| 39 } | 39 } | 
| 40 | 40 | 
| 41 protected override bool TryDequeue(out int unit) { | 41 protected override bool TryDequeue(out int unit) { | 
| 42 int index; | 42 unit = Interlocked.Increment(ref m_next) - 1; | 
| 43 unit = -1; | 43 return unit >= m_source.Length ? false : true; | 
| 44 do { | |
| 45 index = m_next; | |
| 46 if (index >= m_source.Length) | |
| 47 return false; | |
| 48 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | |
| 49 | |
| 50 unit = index; | |
| 51 return true; | |
| 52 } | 44 } | 
| 53 | 45 | 
| 54 protected override void InvokeUnit(int unit) { | 46 protected override void InvokeUnit(int unit) { | 
| 55 try { | 47 try { | 
| 56 m_action(m_source[unit]); | 48 m_action(m_source[unit]); | 
| 57 int pending; | 49 var pending = Interlocked.Decrement(ref m_pending); | 
| 58 do { | |
| 59 pending = m_pending; | |
| 60 } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); | |
| 61 pending--; | |
| 62 if (pending == 0) | 50 if (pending == 0) | 
| 63 m_promise.Resolve(m_source.Length); | 51 m_promise.Resolve(m_source.Length); | 
| 64 } catch (Exception e) { | 52 } catch (Exception e) { | 
| 65 m_promise.Reject(e); | 53 m_promise.Reject(e); | 
| 66 } | 54 } | 
| 99 return m_promise; | 87 return m_promise; | 
| 100 } | 88 } | 
| 101 } | 89 } | 
| 102 | 90 | 
| 103 protected override bool TryDequeue(out int unit) { | 91 protected override bool TryDequeue(out int unit) { | 
| 104 int index; | 92 unit = Interlocked.Increment(ref m_next) - 1; | 
| 105 unit = -1; | 93 return unit >= m_source.Length ? false : true; | 
| 106 do { | |
| 107 index = m_next; | |
| 108 if (index >= m_source.Length) | |
| 109 return false; | |
| 110 } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); | |
| 111 | |
| 112 unit = index; | |
| 113 return true; | |
| 114 } | 94 } | 
| 115 | 95 | 
| 116 protected override void InvokeUnit(int unit) { | 96 protected override void InvokeUnit(int unit) { | 
| 117 try { | 97 try { | 
| 118 m_dest[unit] = m_transform(m_source[unit]); | 98 m_dest[unit] = m_transform(m_source[unit]); | 
| 119 int pending; | 99 var pending = Interlocked.Decrement(ref m_pending); | 
| 120 do { | |
| 121 pending = m_pending; | |
| 122 } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); | |
| 123 pending --; | |
| 124 if (pending == 0) | 100 if (pending == 0) | 
| 125 m_promise.Resolve(m_dest); | 101 m_promise.Resolve(m_dest); | 
| 126 } catch (Exception e) { | 102 } catch (Exception e) { | 
| 127 m_promise.Reject(e); | 103 m_promise.Reject(e); | 
| 128 } | 104 } | 
| 146 throw new ArgumentNullException("action"); | 122 throw new ArgumentNullException("action"); | 
| 147 | 123 | 
| 148 var iter = new ArrayIterator<TSrc>(source, action, threads); | 124 var iter = new ArrayIterator<TSrc>(source, action, threads); | 
| 149 return iter.Promise; | 125 return iter.Promise; | 
| 150 } | 126 } | 
| 127 | |
| 128 public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { | |
| 129 if (source == null) | |
| 130 throw new ArgumentNullException("source"); | |
| 131 if (transform == null) | |
| 132 throw new ArgumentNullException("transform"); | |
| 133 if (threads <= 0) | |
| 134 throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); | |
| 135 | |
| 136 var promise = new Promise<TDst[]>(); | |
| 137 var res = new TDst[source.Length]; | |
| 138 var pending = source.Length; | |
| 139 var semaphore = new Semaphore(threads, threads); | |
| 140 | |
| 141 AsyncPool.InvokeNewThread(() => { | |
| 142 for (int i = 0; i < source.Length; i++) { | |
| 143 if(promise.State != PromiseState.Unresolved) | |
| 144 break; // stop processing in case of error or cancellation | |
| 145 var idx = i; | |
| 146 semaphore.WaitOne(); | |
| 147 try { | |
| 148 var p1 = transform(source[i]); | |
| 149 p1.Anyway(() => semaphore.Release()); | |
| 150 p1.Cancelled(() => semaphore.Release()); | |
| 151 p1.Then( | |
| 152 x => { | |
| 153 res[idx] = x; | |
| 154 var left = Interlocked.Decrement(ref pending); | |
| 155 if (left == 0) | |
| 156 promise.Resolve(res); | |
| 157 }, | |
| 158 e => promise.Reject(e) | |
| 159 ); | |
| 160 | |
| 161 } catch (Exception e) { | |
| 162 promise.Reject(e); | |
| 163 } | |
| 164 } | |
| 165 return 0; | |
| 166 }); | |
| 167 | |
| 168 return promise.Anyway(() => semaphore.Dispose()); | |
| 169 } | |
| 151 } | 170 } | 
| 152 } | 171 } | 
