annotate Implab/Parallels/AsyncPool.cs @ 138:f75cfa58e3d4 v2

added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
author cin
date Tue, 17 Feb 2015 18:16:26 +0300
parents a336cb13c6a9
children 706fccb85524
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
35
2880242f987a initial log capabilities
cin
parents: 25
diff changeset
1 using Implab.Diagnostics;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
2 using System;
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
3 using System.Threading;
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
4 using System.Linq;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
5
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
6 namespace Implab.Parallels {
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
7 /// <summary>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
8 /// Класс для распаралеливания задач.
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
9 /// </summary>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
10 /// <remarks>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
11 /// Используя данный класс и лямда выражения можно распараллелить
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
12 /// вычисления, для этого используется концепция обещаний.
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
13 /// </remarks>
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
14 public static class AsyncPool {
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
15
45
d10034588e38 initial work on interactive logger
cin
parents: 40
diff changeset
16 public static IPromise<T> Invoke<T>(Func<T> func) {
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
17 var p = new Promise<T>();
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
18 var caller = TraceContext.Instance.CurrentOperation;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
19
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
20 ThreadPool.QueueUserWorkItem(param => {
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
21 TraceContext.Instance.EnterLogicalOperation(caller,false);
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
22 try {
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
23 p.Resolve(func());
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
24 } catch(Exception e) {
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
25 p.Reject(e);
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
26 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
27 TraceContext.Instance.Leave();
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
28 }
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
29 });
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
30
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
31 return p;
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
32 }
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
33
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
34 public static IPromise<T> RunThread<T>(Func<T> func) {
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
35 var p = new Promise<T>();
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
36
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
37 var caller = TraceContext.Instance.CurrentOperation;
35
2880242f987a initial log capabilities
cin
parents: 25
diff changeset
38
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
39 var worker = new Thread(() => {
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
40 TraceContext.Instance.EnterLogicalOperation(caller,false);
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
41 try {
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
42 p.Resolve(func());
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
43 } catch (Exception e) {
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
44 p.Reject(e);
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
45 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
46 TraceContext.Instance.Leave();
14
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
47 }
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
48 });
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
49 worker.IsBackground = true;
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
50 worker.Start();
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
51
e943453e5039 Implemented interllocked queue
cin
parents: 11
diff changeset
52 return p;
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
53 }
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
54
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
55
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
56 public static IPromise RunThread(Action func) {
119
2573b562e328 Promises rewritten, added improved version of AsyncQueue
cin
parents: 92
diff changeset
57 var p = new Promise();
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
58
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
59 var caller = TraceContext.Instance.CurrentOperation;
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
60
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
61 var worker = new Thread(() => {
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
62 TraceContext.Instance.EnterLogicalOperation(caller,false);
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
63 try {
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
64 func();
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
65 p.Resolve();
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
66 } catch (Exception e) {
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
67 p.Reject(e);
92
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
68 } finally {
4c0e5ef99986 rewritten tracing
cin
parents: 66
diff changeset
69 TraceContext.Instance.Leave();
48
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
70 }
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
71 });
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
72 worker.IsBackground = true;
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
73 worker.Start();
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
74
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
75 return p;
d9d794b61bb9 Interactive tracing
cin
parents: 45
diff changeset
76 }
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
77
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
78 public static IPromise[] RunThread(params Action[] func) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
79 return func.Select(f => RunThread(f)).ToArray();
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
80 }
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
81
121
62d2f1e98c4e working version of AsyncQueue and batch operations
cin
parents: 120
diff changeset
82 public static IPromise<T>[] RunThread<T>(params Func<T>[] func) {
124
a336cb13c6a9 major update, added Drain mathod to AsyncQueue class
cin
parents: 121
diff changeset
83 return func.Select(f => RunThread(f)).ToArray();
120
f1b897999260 improved asyncpool usability
cin
parents: 119
diff changeset
84 }
25
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
85 }
9bf5b23650c9 refactoring
cin
parents: 16
diff changeset
86 }