Mercurial > pub > ImplabNet
annotate Implab/PromiseExtensions.cs @ 123:f4d6ea6969cc v2
async queue improvements
author | cin |
---|---|
date | Tue, 13 Jan 2015 01:42:38 +0300 |
parents | 2573b562e328 |
children | a336cb13c6a9 |
rev | line source |
---|---|
72 | 1 using System.Threading; |
75 | 2 using System; |
109 | 3 using Implab.Diagnostics; |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
4 using System.Collections.Generic; |
109 | 5 |
6 | |
75 | 7 #if NET_4_5 |
8 using System.Threading.Tasks; | |
9 #endif | |
72 | 10 |
11 namespace Implab { | |
12 public static class PromiseExtensions { | |
13 public static IPromise<T> DispatchToCurrentContext<T>(this IPromise<T> that) { | |
75 | 14 Safe.ArgumentNotNull(that, "that"); |
72 | 15 var context = SynchronizationContext.Current; |
16 if (context == null) | |
17 return that; | |
18 | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
19 var p = new SyncContextPromise<T>(context); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
20 p.On(that.Cancel, PromiseEventType.Cancelled); |
72 | 21 |
104 | 22 that.On( |
76 | 23 p.Resolve, |
24 p.Reject, | |
25 p.Cancel | |
72 | 26 ); |
27 return p; | |
28 } | |
29 | |
30 public static IPromise<T> DispatchToContext<T>(this IPromise<T> that, SynchronizationContext context) { | |
75 | 31 Safe.ArgumentNotNull(that, "that"); |
72 | 32 Safe.ArgumentNotNull(context, "context"); |
33 | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
34 var p = new SyncContextPromise<T>(context); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
35 p.On(that.Cancel, PromiseEventType.Cancelled); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
36 |
72 | 37 |
104 | 38 that.On( |
76 | 39 p.Resolve, |
40 p.Reject, | |
41 p.Cancel | |
72 | 42 ); |
43 return p; | |
44 } | |
75 | 45 |
101 | 46 /// <summary> |
47 /// Ensures the dispatched. | |
48 /// </summary> | |
49 /// <returns>The dispatched.</returns> | |
50 /// <param name="that">That.</param> | |
51 /// <param name="head">Head.</param> | |
52 /// <param name="cleanup">Cleanup.</param> | |
53 /// <typeparam name="TPromise">The 1st type parameter.</typeparam> | |
54 /// <typeparam name="T">The 2nd type parameter.</typeparam> | |
55 public static TPromise EnsureDispatched<TPromise,T>(this TPromise that, IPromise<T> head, Action<T> cleanup) where TPromise : IPromise{ | |
56 Safe.ArgumentNotNull(that, "that"); | |
57 Safe.ArgumentNotNull(head, "head"); | |
58 | |
104 | 59 that.On(null,null,() => head.On(cleanup)); |
101 | 60 |
61 return that; | |
62 } | |
63 | |
75 | 64 public static AsyncCallback AsyncCallback<T>(this Promise<T> that, Func<IAsyncResult,T> callback) { |
65 Safe.ArgumentNotNull(that, "that"); | |
66 Safe.ArgumentNotNull(callback, "callback"); | |
109 | 67 var op = TraceContext.Instance.CurrentOperation; |
75 | 68 return ar => { |
109 | 69 TraceContext.Instance.EnterLogicalOperation(op,false); |
75 | 70 try { |
71 that.Resolve(callback(ar)); | |
72 } catch (Exception err) { | |
73 that.Reject(err); | |
109 | 74 } finally { |
75 TraceContext.Instance.Leave(); | |
75 | 76 } |
77 }; | |
78 } | |
110 | 79 |
80 static void CancelCallback(object cookie) { | |
81 ((ICancellable)cookie).Cancel(); | |
82 } | |
83 | |
84 /// <summary> | |
85 /// Cancells promise after the specified timeout is elapsed. | |
86 /// </summary> | |
87 /// <param name="that">The promise to cancel on timeout.</param> | |
88 /// <param name="milliseconds">The timeout in milliseconds.</param> | |
89 /// <typeparam name="TPromise">The 1st type parameter.</typeparam> | |
90 public static TPromise Timeout<TPromise>(this TPromise that, int milliseconds) where TPromise : IPromise { | |
91 Safe.ArgumentNotNull(that, "that"); | |
92 var timer = new Timer(CancelCallback, that, milliseconds, -1); | |
93 that.On(timer.Dispose, PromiseEventType.All); | |
94 return that; | |
95 } | |
119
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
96 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
97 public static IPromise Combine(this ICollection<IPromise> that) { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
98 Safe.ArgumentNotNull(that, "that"); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
99 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
100 int count = that.Count; |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
101 var medium = new Promise(); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
102 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
103 foreach (var p in that) |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
104 p.On( |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
105 () => { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
106 if (Interlocked.Decrement(ref count) == 0) |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
107 medium.Resolve(); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
108 }, |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
109 error => { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
110 throw new Exception("The dependency promise is failed", error); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
111 }, |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
112 () => { |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
113 throw new OperationCanceledException("The dependency promise is cancelled"); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
114 } |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
115 ); |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
116 |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
117 return medium; |
2573b562e328
Promises rewritten, added improved version of AsyncQueue
cin
parents:
110
diff
changeset
|
118 } |
75 | 119 |
120 #if NET_4_5 | |
121 | |
122 public static Task<T> GetTask<T>(this IPromise<T> that) { | |
123 Safe.ArgumentNotNull(that, "that"); | |
124 var tcs = new TaskCompletionSource<T>(); | |
125 | |
104 | 126 that.On(tcs.SetResult, tcs.SetException, tcs.SetCanceled); |
75 | 127 |
128 return tcs.Task; | |
129 } | |
130 | |
131 #endif | |
72 | 132 } |
133 } | |
134 |