comparison Implab/Parallels/ArrayTraits.cs @ 41:2fc0fbe7d58b

Added TraceContext support to array traits
author cin
date Tue, 15 Apr 2014 18:06:34 +0400
parents 8eca2652d2ff
children d9d794b61bb9
comparison
equal deleted inserted replaced
40:fe33f4e02ad5 41:2fc0fbe7d58b
1 using System; 1 using Implab.Diagnostics;
2 using System;
2 using System.Collections.Generic; 3 using System.Collections.Generic;
3 using System.Diagnostics; 4 using System.Diagnostics;
4 using System.Linq; 5 using System.Linq;
5 using System.Text; 6 using System.Text;
6 using System.Threading; 7 using System.Threading;
9 public static class ArrayTraits { 10 public static class ArrayTraits {
10 class ArrayIterator<TSrc> : DispatchPool<int> { 11 class ArrayIterator<TSrc> : DispatchPool<int> {
11 readonly Action<TSrc> m_action; 12 readonly Action<TSrc> m_action;
12 readonly TSrc[] m_source; 13 readonly TSrc[] m_source;
13 readonly Promise<int> m_promise = new Promise<int>(); 14 readonly Promise<int> m_promise = new Promise<int>();
15 readonly TraceContext m_traceContext;
14 16
15 int m_pending; 17 int m_pending;
16 int m_next; 18 int m_next;
17 19
18 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) 20 public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads)
19 : base(threads) { 21 : base(threads) {
20 22
21 Debug.Assert(source != null); 23 Debug.Assert(source != null);
22 Debug.Assert(action != null); 24 Debug.Assert(action != null);
23 25
26 m_traceContext = TraceContext.Snapshot();
24 m_next = 0; 27 m_next = 0;
25 m_source = source; 28 m_source = source;
26 m_pending = source.Length; 29 m_pending = source.Length;
27 m_action = action; 30 m_action = action;
28 31
34 37
35 public Promise<int> Promise { 38 public Promise<int> Promise {
36 get { 39 get {
37 return m_promise; 40 return m_promise;
38 } 41 }
42 }
43
44 protected override void Worker() {
45 TraceContext.Transfer(m_traceContext);
46 base.Worker();
39 } 47 }
40 48
41 protected override bool TryDequeue(out int unit) { 49 protected override bool TryDequeue(out int unit) {
42 unit = Interlocked.Increment(ref m_next) - 1; 50 unit = Interlocked.Increment(ref m_next) - 1;
43 return unit >= m_source.Length ? false : true; 51 return unit >= m_source.Length ? false : true;
58 class ArrayMapper<TSrc, TDst>: DispatchPool<int> { 66 class ArrayMapper<TSrc, TDst>: DispatchPool<int> {
59 readonly Func<TSrc, TDst> m_transform; 67 readonly Func<TSrc, TDst> m_transform;
60 readonly TSrc[] m_source; 68 readonly TSrc[] m_source;
61 readonly TDst[] m_dest; 69 readonly TDst[] m_dest;
62 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); 70 readonly Promise<TDst[]> m_promise = new Promise<TDst[]>();
71 readonly TraceContext m_traceContext;
63 72
64 int m_pending; 73 int m_pending;
65 int m_next; 74 int m_next;
66 75
67 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) 76 public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads)
73 m_next = 0; 82 m_next = 0;
74 m_source = source; 83 m_source = source;
75 m_dest = new TDst[source.Length]; 84 m_dest = new TDst[source.Length];
76 m_pending = source.Length; 85 m_pending = source.Length;
77 m_transform = transform; 86 m_transform = transform;
87 m_traceContext = TraceContext.Snapshot();
78 88
79 m_promise.Anyway(() => Dispose()); 89 m_promise.Anyway(() => Dispose());
80 m_promise.Cancelled(() => Dispose()); 90 m_promise.Cancelled(() => Dispose());
81 91
82 InitPool(); 92 InitPool();
84 94
85 public Promise<TDst[]> Promise { 95 public Promise<TDst[]> Promise {
86 get { 96 get {
87 return m_promise; 97 return m_promise;
88 } 98 }
99 }
100
101 protected override void Worker() {
102 TraceContext.Transfer(m_traceContext);
103 base.Worker();
89 } 104 }
90 105
91 protected override bool TryDequeue(out int unit) { 106 protected override bool TryDequeue(out int unit) {
92 unit = Interlocked.Increment(ref m_next) - 1; 107 unit = Interlocked.Increment(ref m_next) - 1;
93 return unit >= m_source.Length ? false : true; 108 return unit >= m_source.Length ? false : true;