comparison Implab/Promise.cs @ 119:2573b562e328 v2

Promises rewritten, added improved version of AsyncQueue
author cin
date Sun, 11 Jan 2015 19:13:02 +0300
parents 38d6a4db35d7
children f75cfa58e3d4
comparison
equal deleted inserted replaced
118:e046a94eecb1 119:2573b562e328
1 using System; 1 using System;
2 using System.Collections.Generic; 2 using System.Diagnostics;
3 using System.Reflection;
4 using System.Threading;
5 using Implab.Parallels;
6 3
7 namespace Implab { 4 namespace Implab {
8 5 public class Promise : AbstractPromise<Promise.HandlerDescriptor>, IPromise, IDeferred {
9 /// <summary> 6
10 /// Класс для асинхронного получения результатов. Так называемое "обещание". 7 public struct HandlerDescriptor {
11 /// </summary> 8 readonly Action m_success;
12 /// <typeparam name="T">Тип получаемого результата</typeparam> 9 readonly Action<Exception> m_error;
13 /// <remarks> 10 readonly Action m_cancel;
14 /// <para>Сервис при обращении к его методу дает обещаиние о выполнении операции, 11 readonly IDeferred m_deferred;
15 /// клиент получив такое обещание может установить ряд обратных вызово для получения 12
16 /// событий выполнения обещания, тоесть завершения операции и предоставлении результатов.</para> 13 public HandlerDescriptor(Action success, Action<Exception> error, Action cancel, IDeferred deferred) {
17 /// <para> 14 m_success = success;
18 /// Обещение может быть как выполнено, так и выполнено с ошибкой. Для подписки на 15 m_error = error;
19 /// данные события клиент должен использовать методы <c>Then</c>. 16 m_cancel = cancel;
20 /// </para> 17 m_deferred = deferred;
21 /// <para> 18 }
22 /// Сервис, в свою очередь, по окончанию выполнения операции (возможно с ошибкой), 19
23 /// использует методы <c>Resolve</c> либо <c>Reject</c> для оповещения клиетна о 20 public void SignalSuccess() {
24 /// выполнении обещания. 21 if (m_success != null) {
25 /// </para>
26 /// <para>
27 /// Если сервер успел выполнить обещание еще до того, как клиент на него подписался,
28 /// то в момент подписки клиента будут вызваны соответсвующие события в синхронном
29 /// режиме и клиент будет оповещен в любом случае. Иначе, обработчики добавляются в
30 /// список в порядке подписания и в этом же порядке они будут вызваны при выполнении
31 /// обещания.
32 /// </para>
33 /// <para>
34 /// Обрабатывая результаты обещания можно преобразовывать результаты либо инициировать
35 /// связанные асинхронные операции, которые также возвращают обещания. Для этого следует
36 /// использовать соответствующую форму методе <c>Then</c>.
37 /// </para>
38 /// <para>
39 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
40 /// только инициатор обещания иначе могут возникнуть противоречия.
41 /// </para>
42 /// </remarks>
43 public class Promise<T> : IPromise<T> {
44
45 protected abstract class AbstractHandler : MTCustomQueueNode<AbstractHandler> {
46 public abstract void Resolve(T result);
47 public abstract void Reject(Exception error);
48 public abstract void Cancel();
49 }
50
51 protected class RemapDescriptor<T2> : AbstractHandler {
52
53 readonly Func<T,T2> m_resultHandler;
54 readonly Func<Exception,T2> m_errorHandler;
55 readonly Action m_cancellHandler;
56 readonly Promise<T2> m_medium;
57
58 public RemapDescriptor(Func<T,T2> resultHandler, Func<Exception,T2> errorHandler, Action cancelHandler, Promise<T2> medium) {
59 m_resultHandler = resultHandler;
60 m_errorHandler = errorHandler;
61 m_cancellHandler = cancelHandler;
62 m_medium = medium;
63 }
64
65 public override void Resolve(T result) {
66 if (m_resultHandler != null) {
67 try { 22 try {
68 if (m_medium != null) 23 m_success();
69 m_medium.Resolve(m_resultHandler(result)); 24 if (m_deferred != null)
70 else 25 m_deferred.Resolve();
71 m_resultHandler(result); 26 } catch (Exception err) {
72 } catch (Exception e) { 27 SignalError(err);
73 Reject(e); 28 }
74 } 29 }
75 } else if(m_medium != null) 30 }
76 m_medium.Resolve(default(T2)); 31
77 } 32 public void SignalError(Exception err) {
78 33 if (m_error != null) {
79 public override void Reject(Exception error) {
80 if (m_errorHandler != null) {
81 try { 34 try {
82 var res = m_errorHandler(error); 35 m_error(err);
83 if (m_medium != null) 36 if (m_deferred != null)
84 m_medium.Resolve(res); 37 m_deferred.Resolve();
85 } catch (Exception err2) { 38 } catch (Exception err2) {
86 if (m_medium != null) 39 if (m_deferred != null)
87 m_medium.Reject(err2); 40 m_deferred.Reject(err2);
88 } 41 }
89 } else if (m_medium != null) 42 } else {
90 m_medium.Reject(error); 43 if (m_deferred != null)
91 } 44 m_deferred.Reject(err);
92 45 }
93 public override void Cancel() { 46 }
94 if (m_cancellHandler != null) { 47
48 public void SignalCancel() {
49 if (m_cancel != null) {
95 try { 50 try {
96 m_cancellHandler(); 51 m_cancel();
52 if (m_deferred != null)
53 m_deferred.Resolve();
97 } catch (Exception err) { 54 } catch (Exception err) {
98 Reject(err); 55 SignalError(err);
56 }
57 } else {
58 if (m_deferred != null)
59 m_deferred.Cancel();
60 }
61 }
62 }
63
64 public void Resolve() {
65 BeginSetResult();
66 EndSetResult();
67 }
68
69 public void Reject(Exception error) {
70 SetError(error);
71 }
72
73 #region implemented abstract members of AbstractPromise
74
75 protected override void SignalSuccess(HandlerDescriptor handler) {
76 handler.SignalSuccess();
77 }
78
79 protected override void SignalError(HandlerDescriptor handler, Exception error) {
80 handler.SignalError(error);
81 }
82
83 protected override void SignalCancelled(HandlerDescriptor handler) {
84 handler.SignalCancel();
85 }
86
87 protected override void Listen(PromiseEventType events, Action handler) {
88 AddHandler(new HandlerDescriptor(
89 events.HasFlag(PromiseEventType.Success) ? handler : null,
90 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
91 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
92 null
93 ));
94 }
95
96 #endregion
97
98
99 public Type PromiseType {
100 get {
101 return typeof(void);
102 }
103 }
104
105 public IPromise Then(Action success, Action<Exception> error, Action cancel) {
106 var promise = new Promise();
107 if (success != null)
108 promise.On(Cancel, PromiseEventType.Cancelled);
109
110 AddHandler(new HandlerDescriptor(success, error, cancel, promise));
111
112 return promise;
113 }
114
115 public IPromise Then(Action success, Action<Exception> error) {
116 return Then(success, error, null);
117 }
118
119 public IPromise Then(Action success) {
120 return Then(success, null, null);
121 }
122
123 public IPromise On(Action success, Action<Exception> error, Action cancel) {
124 AddHandler(new HandlerDescriptor(success, error, cancel, null));
125 return this;
126 }
127
128 public IPromise On(Action success, Action<Exception> error) {
129 return On(success, error, null);
130 }
131
132 public IPromise On(Action success) {
133 return On(success, null, null);
134 }
135
136 public IPromise On(Action handler, PromiseEventType events) {
137 return On(
138 events.HasFlag(PromiseEventType.Success) ? handler : null,
139 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>(err => handler()) : null,
140 events.HasFlag(PromiseEventType.Cancelled) ? handler : null
141 );
142 }
143
144 public IPromise<T> Cast<T>() {
145 throw new InvalidCastException();
146 }
147
148 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error, Func<IPromise> cancel) {
149 var medium = new Promise();
150
151 On(
152 () => {
153 if (medium.IsCancelled)
99 return; 154 return;
100 } 155 if (chained != null)
101 } 156 ConnectPromise(chained(), medium);
102 if (m_medium != null) 157 },
103 m_medium.Cancel(); 158 ex => {
104 } 159 if (medium.IsCancelled)
105 }
106
107 protected class HandlerDescriptor : AbstractHandler {
108
109 readonly Action<T> m_resultHandler;
110 readonly Action<Exception> m_errorHandler;
111 readonly Action m_cancellHandler;
112 readonly Promise<T> m_medium;
113
114 public HandlerDescriptor(Action<T> resultHandler, Action<Exception> errorHandler, Action cancelHandler, Promise<T> medium) {
115 m_resultHandler = resultHandler;
116 m_errorHandler = errorHandler;
117 m_cancellHandler = cancelHandler;
118 m_medium = medium;
119 }
120
121 public override void Resolve(T result) {
122 if (m_resultHandler != null) {
123 try {
124 m_resultHandler(result);
125 } catch (Exception e) {
126 Reject(e);
127 return; 160 return;
128 } 161 if (error != null) {
129 } 162 try {
130 if(m_medium != null) 163 ConnectPromise(error(ex), medium);
131 m_medium.Resolve(result); 164 } catch (Exception ex2) {
132 } 165 medium.Reject(ex2);
133 166 }
134 public override void Reject(Exception error) { 167 } else {
135 if (m_errorHandler != null) { 168 medium.Reject(ex);
136 try { 169 }
137 m_errorHandler(error); 170 },
138 if (m_medium != null) 171 () => {
139 m_medium.Resolve(default(T)); 172 if (medium.IsCancelled)
140 } catch (Exception err2) {
141 if (m_medium != null)
142 m_medium.Reject(err2);
143 }
144 } else if (m_medium != null)
145 m_medium.Reject(error);
146 }
147
148 public override void Cancel() {
149 if (m_cancellHandler != null) {
150 try {
151 m_cancellHandler();
152 } catch (Exception err) {
153 Reject(err);
154 return; 173 return;
155 } 174 if (cancel != null)
156 } 175 ConnectPromise(cancel(), medium);
157 if (m_medium != null) 176 else
158 m_medium.Cancel(); 177 medium.Cancel();
159 } 178 }
160 } 179 );
161 180
162 const int UNRESOLVED_SATE = 0; 181 if (chained != null)
163 const int TRANSITIONAL_STATE = 1; 182 medium.On(Cancel, PromiseEventType.Cancelled);
164 const int SUCCEEDED_STATE = 2;
165 const int REJECTED_STATE = 3;
166 const int CANCELLED_STATE = 4;
167
168 int m_childrenCount;
169 int m_state;
170 T m_result;
171 Exception m_error;
172
173 readonly MTCustomQueue<AbstractHandler> m_handlers = new MTCustomQueue<AbstractHandler>();
174 //readonly MTQueue<AbstractHandler> m_handlers = new MTQueue<AbstractHandler>();
175
176 public Promise() {
177 }
178
179 public Promise(IPromise parent) {
180 if (parent != null)
181 AddHandler(
182 null,
183 null,
184 () => {
185 if (parent.IsExclusive)
186 parent.Cancel();
187 },
188 null,
189 false
190 );
191 }
192
193 bool BeginTransit() {
194 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
195 }
196
197 void CompleteTransit(int state) {
198 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
199 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
200 }
201
202 void WaitTransition() {
203 while (m_state == TRANSITIONAL_STATE) {
204 Thread.MemoryBarrier();
205 }
206 }
207
208 public bool IsResolved {
209 get {
210 Thread.MemoryBarrier();
211 return m_state > 1;
212 }
213 }
214
215 public bool IsCancelled {
216 get {
217 Thread.MemoryBarrier();
218 return m_state == CANCELLED_STATE;
219 }
220 }
221
222 public Type PromiseType {
223 get { return typeof(T); }
224 }
225
226 /// <summary>
227 /// Выполняет обещание, сообщая об успешном выполнении.
228 /// </summary>
229 /// <param name="result">Результат выполнения.</param>
230 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
231 public void Resolve(T result) {
232 if (BeginTransit()) {
233 m_result = result;
234 CompleteTransit(SUCCEEDED_STATE);
235 OnStateChanged();
236 } else {
237 WaitTransition();
238 if (m_state != CANCELLED_STATE)
239 throw new InvalidOperationException("The promise is already resolved");
240 }
241 }
242
243 /// <summary>
244 /// Выполняет обещание, сообщая об успешном выполнении. Результатом выполнения будет пустое значения.
245 /// </summary>
246 /// <remarks>
247 /// Данный вариант удобен в случаях, когда интересен факт выполнения операции, нежели полученное значение.
248 /// </remarks>
249 public void Resolve() {
250 Resolve(default(T));
251 }
252
253 /// <summary>
254 /// Выполняет обещание, сообщая об ошибке
255 /// </summary>
256 /// <remarks>
257 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
258 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
259 /// будут проигнорированы.
260 /// </remarks>
261 /// <param name="error">Исключение возникшее при выполнении операции</param>
262 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
263 public void Reject(Exception error) {
264 if (BeginTransit()) {
265 m_error = error is TransientPromiseException ? error.InnerException : error;
266 CompleteTransit(REJECTED_STATE);
267 OnStateChanged();
268 } else {
269 WaitTransition();
270 if (m_state == SUCCEEDED_STATE)
271 throw new InvalidOperationException("The promise is already resolved");
272 }
273 }
274
275 /// <summary>
276 /// Отменяет операцию, если это возможно.
277 /// </summary>
278 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
279 public void Cancel() {
280 if (BeginTransit()) {
281 CompleteTransit(CANCELLED_STATE);
282 OnStateChanged();
283 }
284 }
285
286 /// <summary>
287 /// Последний обработчик в цепочки обещаний.
288 /// </summary>
289 /// <param name="success"></param>
290 /// <param name="error"></param>
291 /// <param name="cancel"></param>
292 /// <remarks>
293 /// <para>
294 /// Данный метод не создает связанного с текущим обещания и предназначен для окончания
295 /// фсинхронной цепочки.
296 /// </para>
297 /// <para>
298 /// Если данный метод вызвать несколько раз, либо добавить другие обработчики, то цепочка
299 /// не будет одиночной <see cref="IsExclusive"/> и, как следствие, будет невозможна отмена
300 /// всей цепи обещаний снизу (с самого последнего обещания).
301 /// </para>
302 /// </remarks>
303 public void On(Action<T> success, Action<Exception> error, Action cancel) {
304 if (success == null && error == null && cancel == null)
305 return;
306
307 AddHandler(success, error, cancel, null, false);
308 }
309
310 public void On(Action<T> success, Action<Exception> error) {
311 AddHandler(success, error, null, null, false);
312 }
313
314 public void On(Action<T> success) {
315 AddHandler(success, null, null, null, false);
316 }
317
318 public void On(Action handler, PromiseEventType events) {
319 Safe.ArgumentNotNull(handler, "handler");
320
321
322 AddHandler(
323 events.HasFlag(PromiseEventType.Success) ? new Action<T>(x => handler()) : null,
324 events.HasFlag(PromiseEventType.Error) ? new Action<Exception>( x => handler()) : null,
325 events.HasFlag(PromiseEventType.Cancelled) ? handler : null,
326 null,
327 false
328 );
329 }
330
331 public IPromise Error(Action<Exception> error) {
332 if (error == null)
333 return this;
334
335 var medium = new Promise<T>(this);
336
337 AddMappers(
338 x => x,
339 e => {
340 error(e);
341 return default(T);
342 },
343 null,
344 medium,
345 true
346 );
347 183
348 return medium; 184 return medium;
349 } 185 }
350 186
351 /// <summary> 187 static void ConnectPromise(IPromise result, Promise medium) {
352 /// Handles error and allows to keep the promise. 188 if (result != null) {
353 /// </summary> 189 result.On(
354 /// <remarks>
355 /// If the specified handler throws an exception, this exception will be used to reject the promise.
356 /// </remarks>
357 /// <param name="handler">The error handler which returns the result of the promise.</param>
358 /// <returns>New promise.</returns>
359 public IPromise<T> Error(Func<Exception,T> handler) {
360 if (handler == null)
361 return this;
362
363 var medium = new Promise<T>(this);
364
365 AddMappers(x => x, handler, null, medium, true);
366
367 return medium;
368 }
369
370 /// <summary>
371 /// Позволяет преобразовать результат выполения операции к новому типу.
372 /// </summary>
373 /// <typeparam name="TNew">Новый тип результата.</typeparam>
374 /// <param name="mapper">Преобразование результата к новому типу.</param>
375 /// <param name="error">Обработчик ошибки. Данный обработчик получит
376 /// исключение возникшее при выполнении операции.</param>
377 /// <returns>Новое обещание, которое будет выполнено при выполнении исходного обещания.</returns>
378 /// <param name = "cancel"></param>
379 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error, Action cancel) {
380 Safe.ArgumentNotNull(mapper, "mapper");
381
382 // создаем прицепленное обещание
383 var medium = new Promise<TNew>(this);
384
385 AddMappers(
386 mapper,
387 error,
388 cancel,
389 medium,
390 true
391 );
392
393 return medium;
394 }
395
396 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper, Func<Exception,TNew> error) {
397 return Then(mapper, error, null);
398 }
399
400 public IPromise<TNew> Then<TNew>(Func<T, TNew> mapper) {
401 return Then(mapper, null, null);
402 }
403
404 /// <summary>
405 /// Сцепляет несколько аснхронных операций. Указанная асинхронная операция будет вызвана после
406 /// выполнения текущей, а результат текущей операции может быть использован для инициализации
407 /// новой операции.
408 /// </summary>
409 /// <typeparam name="TNew">Тип результата указанной асинхронной операции.</typeparam>
410 /// <param name="chained">Асинхронная операция, которая должна будет начаться после выполнения текущей.</param>
411 /// <param name="error">Обработчик ошибки. Данный обработчик получит
412 /// исключение возникшее при выполнении текуещй операции.</param>
413 /// <returns>Новое обещание, которое будет выполнено по окончанию указанной аснхронной операции.</returns>
414 /// <param name = "cancel"></param>
415 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error, Action cancel) {
416
417 Safe.ArgumentNotNull(chained, "chained");
418
419 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
420 // создать посредника, к которому будут подвызяваться следующие обработчики.
421 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
422 // передать через него результаты работы.
423 var medium = new Promise<TNew>(this);
424
425 Func<T,T> resultHandler = delegate(T result) {
426 if (medium.IsCancelled)
427 return default(T);
428
429 var promise = chained(result);
430
431 promise.On(
432 medium.Resolve, 190 medium.Resolve,
433 medium.Reject, 191 medium.Reject,
434 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка 192 () => medium.Reject(new OperationCanceledException())
435 ); 193 );
436 194 medium.On(result.Cancel, PromiseEventType.Cancelled);
437 // notify chained operation that it's not needed anymore 195 } else {
438 // порядок вызова Then, Cancelled важен, поскольку от этого 196 medium.Reject(
439 // зависит IsExclusive 197 new NullReferenceException(
440 medium.On( 198 "The chained asynchronous operation returned" +
441 null, 199 " 'null' where the promise instance is expected"
442 null, 200 )
443 () => {
444 if (promise.IsExclusive)
445 promise.Cancel();
446 }
447 ); 201 );
448 202 }
449 return default(T); 203 }
450 }; 204
451 205 public IPromise Chain(Func<IPromise> chained, Func<Exception, IPromise> error) {
452 Func<Exception,T> errorHandler; 206 return Chain(chained, error, null);
453 207 }
454 if (error != null) 208
455 errorHandler = delegate(Exception e) { 209 public IPromise Chain(Func<IPromise> chained) {
456 try { 210 return Chain(chained, null, null);
457 var promise = error(e); 211 }
458 212
459 promise.On( 213 public IPromise Error(Action<Exception> error) {
460 medium.Resolve, 214 var promise = new Promise();
461 medium.Reject, 215 On(
462 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
463 );
464
465 // notify chained operation that it's not needed anymore
466 // порядок вызова Then, Cancelled важен, поскольку от этого
467 // зависит IsExclusive
468 medium.Cancelled(() => {
469 if (promise.IsExclusive)
470 promise.Cancel();
471 });
472 } catch (Exception e2) {
473 medium.Reject(e2);
474 }
475 return default(T);
476 };
477 else
478 errorHandler = err => {
479 medium.Reject(err);
480 return default(T);
481 };
482
483
484 Action cancelHandler;
485 if (cancel != null)
486 cancelHandler = () => {
487 if (cancel != null)
488 cancel();
489 medium.Cancel();
490 };
491 else
492 cancelHandler = medium.Cancel;
493
494 AddMappers(
495 resultHandler,
496 errorHandler,
497 cancelHandler,
498 null, 216 null,
499 true 217 err => {
500 ); 218 if (error != null)
501 219 try {
502 return medium; 220 error(err);
503 } 221 promise.Resolve();
504 222 } catch (Exception err2) {
505 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained, Func<Exception,IPromise<TNew>> error) { 223 promise.Reject(err2);
506 return Chain(chained, error, null); 224 }
507 } 225 else
508 226 promise.Reject(err);
509 public IPromise<TNew> Chain<TNew>(Func<T, IPromise<TNew>> chained) { 227 }
510 return Chain(chained, null, null); 228 );
511 } 229
512 230 return promise;
513 public IPromise<T> Cancelled(Action handler) { 231 }
514 var medium = new Promise<T>(this); 232
515 AddHandler(null, null, handler, medium, false); 233 public IPromise Cancelled(Action handler) {
516 return medium; 234 var promise = new Promise();
517 } 235 On(
518 236 null,
519 /// <summary> 237 null,
520 /// Adds the specified handler for all cases (success, error, cancel)
521 /// </summary>
522 /// <param name="handler">The handler that will be called anyway</param>
523 /// <returns>self</returns>
524 public IPromise<T> Anyway(Action handler) {
525 Safe.ArgumentNotNull(handler, "handler");
526
527 var medium = new Promise<T>(this);
528
529 AddHandler(
530 x => handler(),
531 e => {
532 handler();
533 throw new TransientPromiseException(e);
534 },
535 handler,
536 medium,
537 true
538 );
539
540 return medium;
541 }
542
543 /// <summary>
544 /// Преобразует результат обещания к нужному типу
545 /// </summary>
546 /// <typeparam name="T2"></typeparam>
547 /// <returns></returns>
548 public IPromise<T2> Cast<T2>() {
549 return Then(x => (T2)(object)x, null);
550 }
551
552 /// <summary>
553 /// Дожидается отложенного обещания и в случае успеха, возвращает
554 /// его, результат, в противном случае бросает исключение.
555 /// </summary>
556 /// <remarks>
557 /// <para>
558 /// Если ожидание обещания было прервано по таймауту, это не значит,
559 /// что обещание было отменено или что-то в этом роде, это только
560 /// означает, что мы его не дождались, однако все зарегистрированные
561 /// обработчики, как были так остались и они будут вызваны, когда
562 /// обещание будет выполнено.
563 /// </para>
564 /// <para>
565 /// Такое поведение вполне оправдано поскольку таймаут может истечь
566 /// в тот момент, когда началась обработка цепочки обработчиков, и
567 /// к тому же текущее обещание может стоять в цепочке обещаний и его
568 /// отклонение может привести к непрогнозируемому результату.
569 /// </para>
570 /// </remarks>
571 /// <param name="timeout">Время ожидания</param>
572 /// <returns>Результат выполнения обещания</returns>
573 public T Join(int timeout) {
574 var evt = new ManualResetEvent(false);
575 Anyway(() => evt.Set());
576
577 if (!evt.WaitOne(timeout, true))
578 throw new TimeoutException();
579
580 switch (m_state) {
581 case SUCCEEDED_STATE:
582 return m_result;
583 case CANCELLED_STATE:
584 throw new OperationCanceledException();
585 case REJECTED_STATE:
586 throw new TargetInvocationException(m_error);
587 default:
588 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
589 }
590 }
591
592 public T Join() {
593 return Join(Timeout.Infinite);
594 }
595
596 void AddMappers<T2>(Func<T,T2> success, Func<Exception,T2> error, Action cancel, Promise<T2> medium, bool inc) {
597 if (inc)
598 Interlocked.Increment(ref m_childrenCount);
599
600 AbstractHandler handler = new RemapDescriptor<T2>(success, error, cancel, medium);
601
602 bool queued;
603
604 if (!IsResolved) {
605 m_handlers.Enqueue(handler);
606 queued = true;
607 } else {
608 // the promise is in resolved state, just invoke the handled with minimum overhead
609 queued = false;
610 InvokeHandler(handler);
611 }
612
613 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
614 // if the promise have been resolved while we was adding handler to the queue
615 // we can't guarantee that someone is still processing it
616 // therefore we will fetch a handler from the queue and execute it
617 // note that fetched handler may be not the one that we have added
618 // even we can fetch no handlers at all :)
619 InvokeHandler(handler);
620 }
621
622 void AddHandler(Action<T> success, Action<Exception> error, Action cancel, Promise<T> medium, bool inc) {
623 if (inc)
624 Interlocked.Increment(ref m_childrenCount);
625
626 AbstractHandler handler = new HandlerDescriptor(success, error, cancel, medium);
627
628 bool queued;
629
630 if (!IsResolved) {
631 m_handlers.Enqueue(handler);
632 queued = true;
633 } else {
634 // the promise is in resolved state, just invoke the handled with minimum overhead
635 queued = false;
636 InvokeHandler(handler);
637 }
638
639 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
640 // if the promise have been resolved while we was adding handler to the queue
641 // we can't guarantee that someone is still processing it
642 // therefore we will fetch a handler from the queue and execute it
643 // note that fetched handler may be not the one that we have added
644 // even we can fetch no handlers at all :)
645 InvokeHandler(handler);
646 }
647
648 protected virtual void InvokeHandler(AbstractHandler handler) {
649 switch (m_state) {
650 case SUCCEEDED_STATE:
651 handler.Resolve(m_result);
652 break;
653 case REJECTED_STATE:
654 handler.Reject(m_error);
655 break;
656 case CANCELLED_STATE:
657 handler.Cancel();
658 break;
659 default:
660 // do nothing
661 return;
662 }
663 }
664
665 void OnStateChanged() {
666 AbstractHandler handler;
667 while (m_handlers.TryDequeue(out handler))
668 InvokeHandler(handler);
669 }
670
671 public bool IsExclusive {
672 get {
673 return m_childrenCount <= 1;
674 }
675 }
676
677 /// <summary>
678 /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний.
679 /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено.
680 /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан.
681 /// </summary>
682 /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param>
683 /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns>
684 /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception>
685 public static IPromise<T[]> CreateComposite(IList<IPromise<T>> promises) {
686 if (promises == null)
687 throw new ArgumentNullException();
688
689 // создаем аккумулятор для результатов и результирующее обещание
690 var result = new T[promises.Count];
691 var promise = new Promise<T[]>();
692
693 // special case
694 if (promises.Count == 0) {
695 promise.Resolve(result);
696 return promise;
697 }
698
699 int pending = promises.Count;
700
701 for (int i = 0; i < promises.Count; i++) {
702 var dest = i;
703
704 if (promises[i] != null) {
705 promises[i].On(
706 x => {
707 result[dest] = x;
708 if (Interlocked.Decrement(ref pending) == 0)
709 promise.Resolve(result);
710 },
711 promise.Reject
712 );
713 } else {
714 if (Interlocked.Decrement(ref pending) == 0)
715 promise.Resolve(result);
716 }
717 }
718
719 promise.Cancelled(
720 () => { 238 () => {
721 foreach (var d in promises) 239 if (handler != null) {
722 if (d != null && d.IsExclusive) 240 try {
723 d.Cancel(); 241 handler();
242 promise.Resolve();
243 } catch (Exception err) {
244 promise.Reject(err);
245 }
246 } else {
247 promise.Cancel();
248 }
724 } 249 }
725 ); 250 );
726 251
727 return promise; 252 return promise;
728 } 253 }
729 254
730 /// <summary> 255
731 /// Объединяет несколько обещаний в одно. Результирующее обещание будет выполнено при
732 /// выполнении всех указанных обещаний. При этом возвращаемые значения первичных обещаний
733 /// игнорируются.
734 /// </summary>
735 /// <param name="promises">Коллекция первичных обещаний, которые будут объеденены в одно.</param>
736 /// <returns>Новое обещание, объединяющее в себе переданные.</returns>
737 /// <remarks>
738 /// Если в коллекции встречаюься <c>null</c>, то они воспринимаются как выполненные обещания.
739 /// </remarks>
740 public static IPromise CreateComposite(ICollection<IPromise> promises) {
741 if (promises == null)
742 throw new ArgumentNullException();
743 if (promises.Count == 0)
744 return Promise<object>.ResultToPromise(null);
745
746 int countdown = promises.Count;
747
748 var result = new Promise<object>();
749
750 foreach (var d in promises) {
751 if (d == null) {
752 if (Interlocked.Decrement(ref countdown) == 0)
753 result.Resolve(null);
754 } else {
755 d.Then(() => {
756 if (Interlocked.Decrement(ref countdown) == 0)
757 result.Resolve(null);
758 });
759 }
760 }
761
762 result.Cancelled(() => {
763 foreach (var d in promises)
764 if (d != null && d.IsExclusive)
765 d.Cancel();
766 });
767
768 return result;
769 }
770
771 public static Promise<T> ResultToPromise(T result) {
772 var p = new Promise<T>();
773 p.Resolve(result);
774 return p;
775 }
776
777 public static Promise<T> ExceptionToPromise(Exception error) {
778 if (error == null)
779 throw new ArgumentNullException();
780
781 var p = new Promise<T>();
782 p.Reject(error);
783 return p;
784 }
785
786 #region IPromiseBase explicit implementation
787
788 IPromise IPromise.Then(Action success, Action<Exception> error, Action cancel) {
789 return Then(
790 success != null ? new Func<T,T>(x => {
791 success();
792 return x;
793 }) : null,
794 error != null ? new Func<Exception,T>(e => {
795 error(e);
796 return default(T);
797 }) : null,
798 cancel
799 );
800 }
801
802 IPromise IPromise.Then(Action success, Action<Exception> error) {
803 return Then(
804 success != null ? new Func<T,T>(x => {
805 success();
806 return x;
807 }) : null,
808 error != null ? new Func<Exception,T>(e => {
809 error(e);
810 return default(T);
811 }) : null
812 );
813 }
814
815 IPromise IPromise.Then(Action success) {
816 Safe.ArgumentNotNull(success, "success");
817 return Then(x => {
818 success();
819 return x;
820 });
821 }
822
823 IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) {
824 return ChainNoResult(chained, error, cancel);
825 }
826
827 IPromise ChainNoResult(Func<IPromise> chained, Func<Exception,IPromise> error, Action cancel) {
828 Safe.ArgumentNotNull(chained, "chained");
829
830 var medium = new Promise<object>(this);
831
832 Func<T,T> resultHandler = delegate {
833 if (medium.IsCancelled)
834 return default(T);
835
836 var promise = chained();
837
838 promise.On(
839 medium.Resolve,
840 medium.Reject,
841 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
842 );
843
844 // notify chained operation that it's not needed anymore
845 // порядок вызова Then, Cancelled важен, поскольку от этого
846 // зависит IsExclusive
847 medium.Cancelled(() => {
848 if (promise.IsExclusive)
849 promise.Cancel();
850 });
851
852 return default(T);
853 };
854
855 Func<Exception,T> errorHandler;
856
857 if (error != null)
858 errorHandler = delegate(Exception e) {
859 try {
860 var promise = error(e);
861
862 promise.On(
863 medium.Resolve,
864 medium.Reject,
865 () => medium.Reject(new OperationCanceledException()) // внешняя отмена связанной операции рассматривается как ошибка
866 );
867
868 // notify chained operation that it's not needed anymore
869 // порядок вызова Then, Cancelled важен, поскольку от этого
870 // зависит IsExclusive
871 medium.Cancelled(() => {
872 if (promise.IsExclusive)
873 promise.Cancel();
874 });
875 } catch (Exception e2) {
876 medium.Reject(e2);
877 }
878 return default(T);
879 };
880 else
881 errorHandler = err => {
882 medium.Reject(err);
883 return default(T);
884 };
885
886
887 Action cancelHandler;
888 if (cancel != null)
889 cancelHandler = () => {
890 if (cancel != null)
891 cancel();
892 medium.Cancel();
893 };
894 else
895 cancelHandler = medium.Cancel;
896
897 AddMappers(
898 resultHandler,
899 errorHandler,
900 cancelHandler,
901 null,
902 true
903 );
904
905 return medium;
906 }
907
908 IPromise IPromise.Chain(Func<IPromise> chained, Func<Exception,IPromise> error) {
909 return ChainNoResult(chained, error, null);
910 }
911
912 IPromise IPromise.Chain(Func<IPromise> chained) {
913 return ChainNoResult(chained, null, null);
914 }
915
916
917 void IPromise.On(Action success, Action<Exception> error, Action cancel) {
918 On(success != null ? new Action<T>(x => success()) : null, error, cancel);
919 }
920
921 void IPromise.On(Action success, Action<Exception> error) {
922 On(x => success(), error, null);
923 }
924
925 void IPromise.On(Action success) {
926 On(x => success(), null, null);
927 }
928
929 IPromise IPromise.Error(Action<Exception> error) {
930 return Error(error);
931 }
932
933 IPromise IPromise.Anyway(Action handler) {
934 return Anyway(handler);
935 }
936
937 IPromise IPromise.Cancelled(Action handler) {
938 return Cancelled(handler);
939 }
940
941 void IPromise.Join() {
942 Join();
943 }
944
945 void IPromise.Join(int timeout) {
946 Join(timeout);
947 }
948
949 #endregion
950
951
952
953 } 256 }
954 } 257 }
258