comparison Implab/Promise.cs @ 19:e3935fdf59a2 promises

Promise is rewritten to use interlocked operations instead of locks
author cin
date Sun, 10 Nov 2013 00:21:33 +0400
parents 5a4b735ba669
children 9bf5b23650c9
comparison
equal deleted inserted replaced
17:7cd4a843b4e4 19:e3935fdf59a2
1 using System; 1 using System;
2 using System.Collections.Generic; 2 using System.Collections.Generic;
3 using System.Reflection; 3 using System.Reflection;
4 using System.Diagnostics; 4 using System.Diagnostics;
5 using System.Threading; 5 using System.Threading;
6 using Implab.Parallels;
6 7
7 namespace Implab { 8 namespace Implab {
8 9
9 public delegate void ErrorHandler(Exception e); 10 public delegate void ErrorHandler(Exception e);
10 public delegate T ErrorHandler<out T>(Exception e); 11 public delegate T ErrorHandler<out T>(Exception e);
46 /// только инициатор обещания иначе могут возникнуть противоречия. 47 /// только инициатор обещания иначе могут возникнуть противоречия.
47 /// </para> 48 /// </para>
48 /// </remarks> 49 /// </remarks>
49 public class Promise<T> : IPromise { 50 public class Promise<T> : IPromise {
50 51
51 struct ResultHandlerInfo { 52 struct HandlerDescriptor {
52 public ResultHandler<T> resultHandler; 53 public ResultHandler<T> resultHandler;
53 public ErrorHandler errorHandler; 54 public ErrorHandler errorHandler;
54 } 55 public Action cancellHandler;
56
57 public void Resolve(T result) {
58 if (resultHandler != null)
59 try {
60 resultHandler(result);
61 } catch (Exception e) {
62 Reject(e);
63 }
64 }
65
66 public void Reject(Exception err) {
67 if (errorHandler != null)
68 try {
69 errorHandler(err);
70 } catch {
71 }
72 }
73
74 public void Cancel() {
75 if (cancellHandler != null)
76 try {
77 cancellHandler();
78 } catch {
79 }
80 }
81 }
82
83 const int UnresolvedSate = 0;
84 const int TransitionalState = 1;
85 const int ResolvedState = 2;
86 const int RejectedState = 3;
87 const int CancelledState = 4;
55 88
56 readonly IPromise m_parent; 89 readonly IPromise m_parent;
57
58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
60
61 readonly object m_lock = new Object();
62 readonly bool m_cancellable; 90 readonly bool m_cancellable;
91
63 int m_childrenCount = 0; 92 int m_childrenCount = 0;
64 93 int m_state;
65 PromiseState m_state;
66 T m_result; 94 T m_result;
67 Exception m_error; 95 Exception m_error;
96
97 readonly MTQueue<HandlerDescriptor> m_handlers = new MTQueue<HandlerDescriptor>();
68 98
69 public Promise() { 99 public Promise() {
70 m_cancellable = true; 100 m_cancellable = true;
71 } 101 }
72 102
73 public Promise(IPromise parent, bool cancellable) { 103 public Promise(IPromise parent, bool cancellable) {
74 m_cancellable = cancellable; 104 m_cancellable = cancellable;
75 m_parent = parent; 105 m_parent = parent;
76 if (parent != null)
77 parent.HandleCancelled(InternalCancel);
78 } 106 }
79 107
80 void InternalCancel() { 108 void InternalCancel() {
81 // don't try to cancel parent :) 109 // don't try to cancel parent :)
82 Cancel(false); 110 Cancel(false);
83 } 111 }
84 112
113 bool BeginTransit() {
114 return UnresolvedSate == Interlocked.CompareExchange(ref m_state, TransitionalState, UnresolvedSate);
115 }
116
117 void CompleteTransit(int state) {
118 if (TransitionalState != Interlocked.CompareExchange(ref m_state, state, TransitionalState))
119 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
120 }
121
122 public bool IsResolved {
123 get {
124 return m_state > 1;
125 }
126 }
127
128 public bool IsCancelled {
129 get {
130 return m_state == CancelledState;
131 }
132 }
133
85 /// <summary> 134 /// <summary>
86 /// Выполняет обещание, сообщая об успешном выполнении. 135 /// Выполняет обещание, сообщая об успешном выполнении.
87 /// </summary> 136 /// </summary>
88 /// <param name="result">Результат выполнения.</param> 137 /// <param name="result">Результат выполнения.</param>
89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> 138 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
90 public void Resolve(T result) { 139 public void Resolve(T result) {
91 lock (m_lock) { 140 if (BeginTransit()) {
92 if (m_state == PromiseState.Cancelled)
93 return;
94 if (m_state != PromiseState.Unresolved)
95 throw new InvalidOperationException("The promise is already resolved");
96 m_result = result; 141 m_result = result;
97 m_state = PromiseState.Resolved; 142 CompleteTransit(ResolvedState);
98 } 143 OnStateChanged();
99 144 } else if (m_state != CancelledState)
100 OnStateChanged(); 145 throw new InvalidOperationException("The promise is already resolved");
101 } 146 }
102 147
103 /// <summary> 148 /// <summary>
104 /// Выполняет обещание, сообщая об ошибке 149 /// Выполняет обещание, сообщая об ошибке
105 /// </summary> 150 /// </summary>
109 /// будут проигнорированы. 154 /// будут проигнорированы.
110 /// </remarks> 155 /// </remarks>
111 /// <param name="error">Исключение возникшее при выполнении операции</param> 156 /// <param name="error">Исключение возникшее при выполнении операции</param>
112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> 157 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
113 public void Reject(Exception error) { 158 public void Reject(Exception error) {
114 lock (m_lock) { 159 if (BeginTransit()) {
115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
116 return;
117 if (m_state != PromiseState.Unresolved)
118 throw new InvalidOperationException("The promise is already resolved");
119 m_error = error; 160 m_error = error;
120 m_state = PromiseState.Rejected; 161 CompleteTransit(RejectedState);
121 } 162 OnStateChanged();
122 163 } else if (m_state == ResolvedState)
123 OnStateChanged(); 164 throw new InvalidOperationException("The promise is already resolved");
124 } 165 }
125 166
126 /// <summary> 167 /// <summary>
127 /// Отменяет операцию, если это возможно. 168 /// Отменяет операцию, если это возможно.
128 /// </summary> 169 /// </summary>
142 if (success == null && error == null) 183 if (success == null && error == null)
143 return this; 184 return this;
144 185
145 var medium = new Promise<T>(this, true); 186 var medium = new Promise<T>(this, true);
146 187
147 var handlerInfo = new ResultHandlerInfo(); 188 ResultHandler<T> resultHandler;
148
149 if (success != null) 189 if (success != null)
150 handlerInfo.resultHandler = x => { 190 resultHandler = x => {
151 success(x); 191 success(x);
152 medium.Resolve(x); 192 medium.Resolve(x);
153 }; 193 };
154 else 194 else
155 handlerInfo.resultHandler = medium.Resolve; 195 resultHandler = medium.Resolve;
156 196
197 ErrorHandler errorHandler;
157 if (error != null) 198 if (error != null)
158 handlerInfo.errorHandler = x => { 199 errorHandler = x => {
159 try { 200 try {
160 error(x); 201 error(x);
161 } catch { } 202 } catch { }
162 medium.Reject(x); 203 medium.Reject(x);
163 }; 204 };
164 else 205 else
165 handlerInfo.errorHandler = medium.Reject; 206 errorHandler = medium.Reject;
166 207
167 AddHandler(handlerInfo); 208 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
168 209
169 return medium; 210 return medium;
170 } 211 }
171 212
172 /// <summary> 213 /// <summary>
180 if (success == null && error == null) 221 if (success == null && error == null)
181 return this; 222 return this;
182 223
183 var medium = new Promise<T>(this, true); 224 var medium = new Promise<T>(this, true);
184 225
185 var handlerInfo = new ResultHandlerInfo(); 226 ResultHandler<T> resultHandler;
227 ErrorHandler errorHandler;
186 228
187 if (success != null) 229 if (success != null)
188 handlerInfo.resultHandler = x => { 230 resultHandler = x => {
189 success(x); 231 success(x);
190 medium.Resolve(x); 232 medium.Resolve(x);
191 }; 233 };
192 else 234 else
193 handlerInfo.resultHandler = medium.Resolve; 235 resultHandler = medium.Resolve;
194 236
195 if (error != null) 237 if (error != null)
196 handlerInfo.errorHandler = x => { 238 errorHandler = x => {
197 try { 239 try {
198 medium.Resolve(error(x)); 240 medium.Resolve(error(x));
199 } catch { } 241 } catch { }
200 medium.Reject(x); 242 medium.Reject(x);
201 }; 243 };
202 else 244 else
203 handlerInfo.errorHandler = medium.Reject; 245 errorHandler = medium.Reject;
204 246
205 AddHandler(handlerInfo); 247 AddHandler(resultHandler, errorHandler, medium.InternalCancel);
206 248
207 return medium; 249 return medium;
208 } 250 }
209 251
210 252
212 if (success == null) 254 if (success == null)
213 return this; 255 return this;
214 256
215 var medium = new Promise<T>(this, true); 257 var medium = new Promise<T>(this, true);
216 258
217 var handlerInfo = new ResultHandlerInfo(); 259 ResultHandler<T> resultHandler;
218 260
219 if (success != null) 261 if (success != null)
220 handlerInfo.resultHandler = x => { 262 resultHandler = x => {
221 success(x); 263 success(x);
222 medium.Resolve(x); 264 medium.Resolve(x);
223 }; 265 };
224 else 266 else
225 handlerInfo.resultHandler = medium.Resolve; 267 resultHandler = medium.Resolve;
226 268
227 handlerInfo.errorHandler = medium.Reject; 269 AddHandler(resultHandler, medium.Reject, medium.InternalCancel);
228
229 AddHandler(handlerInfo);
230 270
231 return medium; 271 return medium;
232 } 272 }
233 273
234 public Promise<T> Error(ErrorHandler error) { 274 public Promise<T> Error(ErrorHandler error) {
247 if (handler == null) 287 if (handler == null)
248 return this; 288 return this;
249 289
250 var medium = new Promise<T>(this, true); 290 var medium = new Promise<T>(this, true);
251 291
252 AddHandler(new ResultHandlerInfo { 292 AddHandler(
253 errorHandler = e => { 293 null,
294 e => {
254 try { 295 try {
255 medium.Resolve(handler(e)); 296 medium.Resolve(handler(e));
256 } catch (Exception e2) { 297 } catch (Exception e2) {
257 medium.Reject(e2); 298 medium.Reject(e2);
258 } 299 }
259 } 300 },
260 }); 301 medium.InternalCancel
302 );
261 303
262 return medium; 304 return medium;
263 } 305 }
264 306
265 public Promise<T> Anyway(Action handler) { 307 public Promise<T> Anyway(Action handler) {
266 if (handler == null) 308 if (handler == null)
267 return this; 309 return this;
268 310
269 var medium = new Promise<T>(); 311 var medium = new Promise<T>();
270 312
271 AddHandler(new ResultHandlerInfo { 313 AddHandler(
272 resultHandler = x => { 314 x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs 315 // to avoid handler being called multiple times we handle exception by ourselfs
274 try { 316 try {
275 handler(); 317 handler();
276 medium.Resolve(x); 318 medium.Resolve(x);
277 } catch (Exception e) { 319 } catch (Exception e) {
278 medium.Reject(e); 320 medium.Reject(e);
279 } 321 }
280 }, 322 },
281 errorHandler = x => { 323
324 e => {
282 try { 325 try {
283 handler(); 326 handler();
284 } catch { } 327 } catch { }
285 medium.Reject(x); 328 medium.Reject(e);
286 } 329 },
287 }); 330
331 medium.InternalCancel
332 );
288 333
289 return medium; 334 return medium;
290 } 335 }
291 336
292 /// <summary> 337 /// <summary>
302 throw new ArgumentNullException("mapper"); 347 throw new ArgumentNullException("mapper");
303 348
304 // создаем прицепленное обещание 349 // создаем прицепленное обещание
305 var chained = new Promise<TNew>(); 350 var chained = new Promise<TNew>();
306 351
307 AddHandler(new ResultHandlerInfo() { 352 ResultHandler<T> resultHandler = result => chained.Resolve(mapper(result));
308 resultHandler = result => chained.Resolve(mapper(result)), 353 ErrorHandler errorHandler = delegate(Exception e) {
309 errorHandler = delegate(Exception e) { 354 if (error != null)
310 if (error != null) 355 try {
311 try { 356 error(e);
312 error(e); 357 } catch { }
313 } catch { } 358 // в случае ошибки нужно передать исключение дальше по цепочке
314 // в случае ошибки нужно передать исключение дальше по цепочке 359 chained.Reject(e);
315 chained.Reject(e); 360 };
316 } 361
317 }); 362
363 AddHandler(
364 resultHandler,
365 errorHandler,
366 chained.InternalCancel
367 );
318 368
319 return chained; 369 return chained;
320 } 370 }
321 371
322 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) { 372 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper) {
339 // создать посредника, к которому будут подвызяваться следующие обработчики. 389 // создать посредника, к которому будут подвызяваться следующие обработчики.
340 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы 390 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
341 // передать через него результаты работы. 391 // передать через него результаты работы.
342 var medium = new Promise<TNew>(this, true); 392 var medium = new Promise<TNew>(this, true);
343 393
344 AddHandler(new ResultHandlerInfo { 394 ResultHandler<T> resultHandler = delegate(T result) {
345 resultHandler = delegate(T result) { 395 if (medium.IsCancelled)
346 if (medium.State == PromiseState.Cancelled) 396 return;
347 return; 397
348 398 var promise = chained(result);
349 var promise = chained(result); 399
350 400 // notify chained operation that it's not needed
351 // notify chained operation that it's not needed 401 medium.Cancelled(() => promise.Cancel());
352 medium.Cancelled(() => promise.Cancel()); 402 promise.Then(
353 promise.Then( 403 x => medium.Resolve(x),
354 x => medium.Resolve(x), 404 e => medium.Reject(e)
355 e => medium.Reject(e) 405 );
356 ); 406 };
357 }, 407
358 errorHandler = delegate(Exception e) { 408 ErrorHandler errorHandler = delegate(Exception e) {
359 if (error != null) 409 if (error != null)
360 error(e); 410 error(e);
361 // в случае ошибки нужно передать исключение дальше по цепочке 411 // в случае ошибки нужно передать исключение дальше по цепочке
362 medium.Reject(e); 412 medium.Reject(e);
363 } 413 };
364 }); 414
415 AddHandler(
416 resultHandler,
417 errorHandler,
418 medium.InternalCancel
419 );
365 420
366 return medium; 421 return medium;
367 } 422 }
368 423
369 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) { 424 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
370 return Chain(chained, null); 425 return Chain(chained, null);
371 } 426 }
372 427
373 public Promise<T> Cancelled(Action handler) { 428 public Promise<T> Cancelled(Action handler) {
429 AddHandler(null, null, handler);
430 return this;
431 }
432
433 public Promise<T> Finally(Action handler) {
374 if (handler == null) 434 if (handler == null)
375 return this; 435 throw new ArgumentNullException("handler");
376 lock (m_lock) { 436 AddHandler(
377 if (m_state == PromiseState.Unresolved) 437 x => handler(),
378 m_cancelHandlers.AddLast(handler); 438 e => handler(),
379 else if (m_state == PromiseState.Cancelled) 439 handler
380 handler(); 440 );
381 }
382 return this; 441 return this;
383 }
384
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
387 } 442 }
388 443
389 /// <summary> 444 /// <summary>
390 /// Дожидается отложенного обещания и в случае успеха, возвращает 445 /// Дожидается отложенного обещания и в случае успеха, возвращает
391 /// его, результат, в противном случае бросает исключение. 446 /// его, результат, в противном случае бросает исключение.
413 Cancelled(() => evt.Set()); 468 Cancelled(() => evt.Set());
414 469
415 if (!evt.WaitOne(timeout, true)) 470 if (!evt.WaitOne(timeout, true))
416 throw new TimeoutException(); 471 throw new TimeoutException();
417 472
418 switch (State) { 473 switch (m_state) {
419 case PromiseState.Resolved: 474 case ResolvedState:
420 return m_result; 475 return m_result;
421 case PromiseState.Cancelled: 476 case CancelledState:
422 throw new OperationCanceledException(); 477 throw new OperationCanceledException();
423 case PromiseState.Rejected: 478 case RejectedState:
424 throw new TargetInvocationException(m_error); 479 throw new TargetInvocationException(m_error);
425 default: 480 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State)); 481 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
427 } 482 }
428 } 483 }
429 484
430 public T Join() { 485 public T Join() {
431 return Join(Timeout.Infinite); 486 return Join(Timeout.Infinite);
432 } 487 }
433 488
434 void AddHandler(ResultHandlerInfo handler) { 489 void AddHandler(ResultHandler<T> success, ErrorHandler error, Action cancel) {
435 bool invokeRequired = false; 490 Interlocked.Increment(ref m_childrenCount);
436 491
437 lock (m_lock) { 492 HandlerDescriptor handler = new HandlerDescriptor {
438 m_childrenCount++; 493 resultHandler = success,
439 if (m_state == PromiseState.Unresolved) { 494 errorHandler = error,
440 m_resultHandlers.AddLast(handler); 495 cancellHandler = cancel
441 } else 496 };
442 invokeRequired = true; 497
443 } 498 bool queued;
444 499
445 // обработчики не должны блокировать сам объект 500 if (!IsResolved) {
446 if (invokeRequired) 501 m_handlers.Enqueue(handler);
502 queued = true;
503 } else {
504 // the promise is in resolved state, just invoke the handled with minimum overhead
505 queued = false;
447 InvokeHandler(handler); 506 InvokeHandler(handler);
448 } 507 }
449 508
450 void InvokeHandler(ResultHandlerInfo handler) { 509 if (queued && IsResolved && m_handlers.TryDequeue(out handler))
510 // if the promise have been resolved while we was adding handler to the queue
511 // we can't guarantee that someone is still processing it
512 // therefore we will fetch a handler from the queue and execute it
513 // note that fetched handler may be not the one we have added
514 InvokeHandler(handler);
515
516 }
517
518 void InvokeHandler(HandlerDescriptor handler) {
451 switch (m_state) { 519 switch (m_state) {
452 case PromiseState.Resolved: 520 case ResolvedState:
453 try { 521 handler.Resolve(m_result);
454 if (handler.resultHandler != null)
455 handler.resultHandler(m_result);
456 } catch (Exception e) {
457 try {
458 if (handler.errorHandler != null)
459 handler.errorHandler(e);
460 } catch { }
461 }
462 break; 522 break;
463 case PromiseState.Rejected: 523 case RejectedState:
464 try { 524 handler.Reject(m_error);
465 if (handler.errorHandler != null) 525 break;
466 handler.errorHandler(m_error); 526 case CancelledState:
467 } catch { } 527 handler.Cancel();
468 break; 528 break;
469 default: 529 default:
470 // do nothing 530 // do nothing
471 return; 531 return;
472 } 532 }
473 } 533 }
474 534
475 protected virtual void OnStateChanged() { 535 protected virtual void OnStateChanged() {
476 switch (m_state) { 536 HandlerDescriptor handler;
477 case PromiseState.Resolved: 537 while (m_handlers.TryDequeue(out handler))
478 foreach (var resultHandlerInfo in m_resultHandlers) 538 InvokeHandler(handler);
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 } 539 }
507 540
508 541
509 542
510 public bool IsExclusive { 543 public bool IsExclusive {
511 get { 544 get {
512 lock (m_lock) { 545 return m_childrenCount <= 1;
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 } 546 }
524 } 547 }
525 548
526 protected bool Cancel(bool dependencies) { 549 protected bool Cancel(bool dependencies) {
527 bool result; 550 if (BeginTransit()) {
528 551 CompleteTransit(CancelledState);
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged(); 552 OnStateChanged();
540 553
541 if (dependencies && m_parent != null && m_parent.IsExclusive) { 554 if (dependencies && m_parent != null && m_parent.IsExclusive)
542 m_parent.Cancel(); 555 m_parent.Cancel();
543 } 556
544 557 return true;
545 return result; 558 } else {
559 return false;
560 }
546 } 561 }
547 562
548 } 563 }
549 } 564 }