annotate Implab/Parallels/AsyncPool.cs @ 196:40d7fed4a09e

fixed promise chaining behavior, the error handler doesn't handle result or cancellation handlers exceptions these exceptions are propagated to the next handlers.
author cin
date Mon, 29 Aug 2016 23:15:51 +0300
parents 706fccb85524
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
35
2880242f987a initial log capabilities
cin
parents: 25
diff changeset
1 using Implab.Diagnostics;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
2 using System;
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
3 using System.Threading;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
4 using System.Linq;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
5
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
6 namespace Implab.Parallels {
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
7 /// <summary>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
8 /// Класс для распаралеливания задач.
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
9 /// </summary>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
10 /// <remarks>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
11 /// Используя данный класс и лямда выражения можно распараллелить
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
12 /// вычисления, для этого используется концепция обещаний.
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
13 /// </remarks>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
14 public static class AsyncPool {
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
15
45
d10034588e38 initial work on interactive logger
cin
parents: 40
diff changeset
16 public static IPromise<T> Invoke<T>(Func<T> func) {
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
17 var p = new Promise<T>();
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
18 var caller = TraceContext.Instance.CurrentOperation;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
19
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
20 ThreadPool.QueueUserWorkItem(param => {
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
21 TraceContext.Instance.EnterLogicalOperation(caller,false);
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
22 try {
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
23 p.Resolve(func());
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
24 } catch(Exception e) {
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
25 p.Reject(e);
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
26 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
27 TraceContext.Instance.Leave();
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
28 }
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
29 });
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
30
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
31 return p;
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
32 }
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
33
145
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
34 public static IPromise<T> Invoke<T>(Func<ICancellationToken, T> func) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
35 var p = new Promise<T>();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
36 var caller = TraceContext.Instance.CurrentOperation;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
37
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
38 ThreadPool.QueueUserWorkItem(param => {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
39 TraceContext.Instance.EnterLogicalOperation(caller,false);
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
40 try {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
41 p.Resolve(func(p));
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
42 } catch(Exception e) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
43 p.Reject(e);
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
44 } finally {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
45 TraceContext.Instance.Leave();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
46 }
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
47 });
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
48
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
49 return p;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
50 }
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
51
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
52 public static IPromise<T> RunThread<T>(Func<T> func) {
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
53 var p = new Promise<T>();
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
54
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
55 var caller = TraceContext.Instance.CurrentOperation;
35
2880242f987a initial log capabilities
cin
parents: 25
diff changeset
56
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
57 var worker = new Thread(() => {
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
58 TraceContext.Instance.EnterLogicalOperation(caller,false);
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
59 try {
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
60 p.Resolve(func());
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
61 } catch (Exception e) {
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
62 p.Reject(e);
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
63 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
64 TraceContext.Instance.Leave();
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
65 }
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
66 });
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
67 worker.IsBackground = true;
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
68 worker.Start();
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
69
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
70 return p;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
71 }
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
72
145
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
73 public static IPromise<T> RunThread<T>(Func<ICancellationToken, T> func) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
74 var p = new Promise<T>();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
75
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
76 var caller = TraceContext.Instance.CurrentOperation;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
77
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
78 var worker = new Thread(() => {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
79 TraceContext.Instance.EnterLogicalOperation(caller,false);
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
80 try {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
81 p.Resolve(func(p));
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
82 } catch (Exception e) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
83 p.Reject(e);
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
84 } finally {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
85 TraceContext.Instance.Leave();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
86 }
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
87 });
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
88 worker.IsBackground = true;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
89 worker.Start();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
90
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
91 return p;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
92 }
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
93
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
94
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
95 public static IPromise RunThread(Action func) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents: 92
diff changeset
96 var p = new Promise();
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
97
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
98 var caller = TraceContext.Instance.CurrentOperation;
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
99
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
100 var worker = new Thread(() => {
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
101 TraceContext.Instance.EnterLogicalOperation(caller,false);
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
102 try {
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
103 func();
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
104 p.Resolve();
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
105 } catch (Exception e) {
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
106 p.Reject(e);
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
107 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
108 TraceContext.Instance.Leave();
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
109 }
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
110 });
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
111 worker.IsBackground = true;
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
112 worker.Start();
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
113
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
114 return p;
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
115 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
116
145
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
117 public static IPromise RunThread(Action<ICancellationToken> func) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
118 var p = new Promise();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
119
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
120 var caller = TraceContext.Instance.CurrentOperation;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
121
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
122 var worker = new Thread(() => {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
123 TraceContext.Instance.EnterLogicalOperation(caller,false);
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
124 try {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
125 func(p);
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
126 p.Resolve();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
127 } catch (Exception e) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
128 p.Reject(e);
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
129 } finally {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
130 TraceContext.Instance.Leave();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
131 }
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
132 });
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
133 worker.IsBackground = true;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
134 worker.Start();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
135
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
136 return p;
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
137 }
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
138
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
139 public static IPromise[] RunThread(params Action[] func) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
140 return func.Select(f => RunThread(f)).ToArray();
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
141 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
142
145
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
143 public static IPromise[] RunThread(params Action<ICancellationToken>[] func) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
144 return func.Select(f => RunThread(f)).ToArray();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
145 }
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
146
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
147 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
148 return func.Select(f => RunThread(f)).ToArray();
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
149 }
145
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
150
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
151 public static IPromise<T>[] RunThread<T>(params Func<ICancellationToken, T>[] func) {
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
152 return func.Select(f => RunThread(f)).ToArray();
706fccb85524 RC: cancellation support for promises + tests
cin
parents: 124
diff changeset
153 }
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
154 }
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
155 }