comparison Implab/Promise.cs @ 18:0c924dff5498

Слияние с promises
author cin
date Fri, 08 Nov 2013 01:27:04 +0400
parents 5a4b735ba669
children e3935fdf59a2
comparison
equal deleted inserted replaced
6:dfa21d507bc5 18:0c924dff5498
1 using System; 1 using System;
2 using System.Collections.Generic; 2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Reflection; 3 using System.Reflection;
5 using System.Text;
6 using System.Diagnostics; 4 using System.Diagnostics;
7 using System.Threading; 5 using System.Threading;
8 6
9 namespace Implab { 7 namespace Implab {
10 8
11 public delegate void ErrorHandler(Exception e); 9 public delegate void ErrorHandler(Exception e);
12 10 public delegate T ErrorHandler<out T>(Exception e);
13 public delegate void ResultHandler<T>(T result); 11 public delegate void ResultHandler<in T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result); 12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result); 13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16 14
17 /// <summary> 15 /// <summary>
18 /// Класс для асинхронного получения результатов. Так называемое "обещание". 16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
19 /// </summary> 17 /// </summary>
20 /// <typeparam name="T">Тип получаемого результата</typeparam> 18 /// <typeparam name="T">Тип получаемого результата</typeparam>
46 /// <para> 44 /// <para>
47 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать 45 /// Также хорошим правилом является то, что <c>Resolve</c> и <c>Reject</c> должен вызывать
48 /// только инициатор обещания иначе могут возникнуть противоречия. 46 /// только инициатор обещания иначе могут возникнуть противоречия.
49 /// </para> 47 /// </para>
50 /// </remarks> 48 /// </remarks>
51 public class Promise<T> { 49 public class Promise<T> : IPromise {
52 50
53 struct ResultHandlerInfo { 51 struct ResultHandlerInfo {
54 public ResultHandler<T> resultHandler; 52 public ResultHandler<T> resultHandler;
55 public ErrorHandler errorHandler; 53 public ErrorHandler errorHandler;
56 } 54 }
57 55
58 enum State { 56 readonly IPromise m_parent;
59 Unresolved, 57
60 Resolving, 58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
61 Resolved, 59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
62 Cancelled 60
63 } 61 readonly object m_lock = new Object();
64 62 readonly bool m_cancellable;
65 LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>(); 63 int m_childrenCount = 0;
66 State m_state; 64
67 bool m_cancellable; 65 PromiseState m_state;
68 T m_result; 66 T m_result;
69 Exception m_error; 67 Exception m_error;
70 68
71 public Promise() { 69 public Promise() {
72 m_cancellable = true; 70 m_cancellable = true;
73 } 71 }
74 72
75 /// <summary> 73 public Promise(IPromise parent, bool cancellable) {
76 /// Событие, возникающее при отмене асинхронной операции. 74 m_cancellable = cancellable;
77 /// </summary> 75 m_parent = parent;
78 /// <description> 76 if (parent != null)
79 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. 77 parent.HandleCancelled(InternalCancel);
80 /// </description> 78 }
81 public event EventHandler Cancelled; 79
80 void InternalCancel() {
81 // don't try to cancel parent :)
82 Cancel(false);
83 }
82 84
83 /// <summary> 85 /// <summary>
84 /// Выполняет обещание, сообщая об успешном выполнении. 86 /// Выполняет обещание, сообщая об успешном выполнении.
85 /// </summary> 87 /// </summary>
86 /// <param name="result">Результат выполнения.</param> 88 /// <param name="result">Результат выполнения.</param>
87 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> 89 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
88 public void Resolve(T result) { 90 public void Resolve(T result) {
89 lock (this) { 91 lock (m_lock) {
90 if (m_state == State.Cancelled) 92 if (m_state == PromiseState.Cancelled)
91 return; 93 return;
92 if (m_state != State.Unresolved) 94 if (m_state != PromiseState.Unresolved)
93 throw new InvalidOperationException("The promise is already resolved"); 95 throw new InvalidOperationException("The promise is already resolved");
94 m_result = result; 96 m_result = result;
95 m_state = State.Resolving; 97 m_state = PromiseState.Resolved;
96 } 98 }
97 99
98 ResultHandlerInfo handler; 100 OnStateChanged();
99 while (FetchNextHandler(out handler))
100 InvokeHandler(handler);
101 } 101 }
102 102
103 /// <summary> 103 /// <summary>
104 /// Выполняет обещание, сообщая об ошибке 104 /// Выполняет обещание, сообщая об ошибке
105 /// </summary> 105 /// </summary>
106 /// <remarks>
107 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
108 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
109 /// будут проигнорированы.
110 /// </remarks>
106 /// <param name="error">Исключение возникшее при выполнении операции</param> 111 /// <param name="error">Исключение возникшее при выполнении операции</param>
107 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> 112 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
108 public void Reject(Exception error) { 113 public void Reject(Exception error) {
109 lock (this) { 114 lock (m_lock) {
110 if (m_state == State.Cancelled) 115 if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected)
111 return; 116 return;
112 if (m_state != State.Unresolved) 117 if (m_state != PromiseState.Unresolved)
113 throw new InvalidOperationException("The promise is already resolved"); 118 throw new InvalidOperationException("The promise is already resolved");
114 m_error = error; 119 m_error = error;
115 m_state = State.Resolving; 120 m_state = PromiseState.Rejected;
116 } 121 }
117 122
118 ResultHandlerInfo handler; 123 OnStateChanged();
119 while (FetchNextHandler(out handler))
120 InvokeHandler(handler);
121 } 124 }
122 125
123 /// <summary> 126 /// <summary>
124 /// Отменяет операцию, если это возможно. 127 /// Отменяет операцию, если это возможно.
125 /// </summary> 128 /// </summary>
126 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns> 129 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
127 public bool Cancel() { 130 public bool Cancel() {
128 lock (this) { 131 return Cancel(true);
129 if (m_state == State.Unresolved && m_cancellable) { 132 }
130 m_state = State.Cancelled; 133
131 EventHandler temp = Cancelled; 134 /// <summary>
132 135 /// Adds new handlers to this promise.
133 if (temp != null) 136 /// </summary>
134 temp(this, new EventArgs()); 137 /// <param name="success">The handler of the successfully completed operation.
135 138 /// This handler will recieve an operation result as a parameter.</param>
136 return true; 139 /// <param name="error">Handles an exception that may occur during the operation.</param>
137 } else 140 /// <returns>The new promise chained to this one.</returns>
138 return false;
139 }
140 }
141
142 /// <summary>
143 /// Добавляет обработчики событий выполнения обещания.
144 /// </summary>
145 /// <param name="success">Обработчик успешного выполнения обещания.
146 /// Данному обработчику будет передан результат выполнения операции.</param>
147 /// <param name="error">Обработчик ошибки. Данный обработчик получит
148 /// исключение возникшее при выполнении операции.</param>
149 /// <returns>Само обещание</returns>
150 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) { 141 public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) {
151 if (success == null && error == null) 142 if (success == null && error == null)
152 return this; 143 return this;
153 144
154 var medium = new Promise<T>(); 145 var medium = new Promise<T>(this, true);
155 146
156 var handlerInfo = new ResultHandlerInfo(); 147 var handlerInfo = new ResultHandlerInfo();
157 148
158 if (success != null) 149 if (success != null)
159 handlerInfo.resultHandler = x => { 150 handlerInfo.resultHandler = x => {
160 try { 151 success(x);
161 success(x); 152 medium.Resolve(x);
162 medium.Resolve(x);
163 } catch (Exception e) {
164 medium.Reject(e);
165 }
166 }; 153 };
167 else 154 else
168 handlerInfo.resultHandler = x => medium.Resolve(x); 155 handlerInfo.resultHandler = medium.Resolve;
169 156
170 if (error != null) 157 if (error != null)
171 handlerInfo.errorHandler = x => { 158 handlerInfo.errorHandler = x => {
172 try { 159 try {
173 error(x); 160 error(x);
174 } catch { } 161 } catch { }
175 medium.Reject(x); 162 medium.Reject(x);
176 }; 163 };
177 else 164 else
178 handlerInfo.errorHandler = x => medium.Reject(x); 165 handlerInfo.errorHandler = medium.Reject;
179 166
180 AddHandler(handlerInfo); 167 AddHandler(handlerInfo);
181 168
182 return medium; 169 return medium;
183 } 170 }
184 171
172 /// <summary>
173 /// Adds new handlers to this promise.
174 /// </summary>
175 /// <param name="success">The handler of the successfully completed operation.
176 /// This handler will recieve an operation result as a parameter.</param>
177 /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param>
178 /// <returns>The new promise chained to this one.</returns>
179 public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) {
180 if (success == null && error == null)
181 return this;
182
183 var medium = new Promise<T>(this, true);
184
185 var handlerInfo = new ResultHandlerInfo();
186
187 if (success != null)
188 handlerInfo.resultHandler = x => {
189 success(x);
190 medium.Resolve(x);
191 };
192 else
193 handlerInfo.resultHandler = medium.Resolve;
194
195 if (error != null)
196 handlerInfo.errorHandler = x => {
197 try {
198 medium.Resolve(error(x));
199 } catch { }
200 medium.Reject(x);
201 };
202 else
203 handlerInfo.errorHandler = medium.Reject;
204
205 AddHandler(handlerInfo);
206
207 return medium;
208 }
209
210
185 public Promise<T> Then(ResultHandler<T> success) { 211 public Promise<T> Then(ResultHandler<T> success) {
186 return Then(success, null); 212 if (success == null)
213 return this;
214
215 var medium = new Promise<T>(this, true);
216
217 var handlerInfo = new ResultHandlerInfo();
218
219 if (success != null)
220 handlerInfo.resultHandler = x => {
221 success(x);
222 medium.Resolve(x);
223 };
224 else
225 handlerInfo.resultHandler = medium.Resolve;
226
227 handlerInfo.errorHandler = medium.Reject;
228
229 AddHandler(handlerInfo);
230
231 return medium;
187 } 232 }
188 233
189 public Promise<T> Error(ErrorHandler error) { 234 public Promise<T> Error(ErrorHandler error) {
190 return Then(null, error); 235 return Then(null, error);
236 }
237
238 /// <summary>
239 /// Handles error and allows to keep the promise.
240 /// </summary>
241 /// <remarks>
242 /// If the specified handler throws an exception, this exception will be used to reject the promise.
243 /// </remarks>
244 /// <param name="handler">The error handler which returns the result of the promise.</param>
245 /// <returns>New promise.</returns>
246 public Promise<T> Error(ErrorHandler<T> handler) {
247 if (handler == null)
248 return this;
249
250 var medium = new Promise<T>(this, true);
251
252 AddHandler(new ResultHandlerInfo {
253 errorHandler = e => {
254 try {
255 medium.Resolve(handler(e));
256 } catch (Exception e2) {
257 medium.Reject(e2);
258 }
259 }
260 });
261
262 return medium;
191 } 263 }
192 264
193 public Promise<T> Anyway(Action handler) { 265 public Promise<T> Anyway(Action handler) {
194 if (handler == null) 266 if (handler == null)
195 return this; 267 return this;
196 268
197 var medium = new Promise<T>(); 269 var medium = new Promise<T>();
198 270
199 AddHandler(new ResultHandlerInfo { 271 AddHandler(new ResultHandlerInfo {
200 resultHandler = x => { 272 resultHandler = x => {
273 // to avoid handler being called multiple times we handle exception by ourselfs
201 try { 274 try {
202 handler(); 275 handler();
203 medium.Resolve(x); 276 medium.Resolve(x);
204 } catch (Exception e) { 277 } catch (Exception e) {
205 medium.Reject(e); 278 medium.Reject(e);
227 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) { 300 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
228 if (mapper == null) 301 if (mapper == null)
229 throw new ArgumentNullException("mapper"); 302 throw new ArgumentNullException("mapper");
230 303
231 // создаем прицепленное обещание 304 // создаем прицепленное обещание
232 Promise<TNew> chained = new Promise<TNew>(); 305 var chained = new Promise<TNew>();
233 306
234 AddHandler(new ResultHandlerInfo() { 307 AddHandler(new ResultHandlerInfo() {
235 resultHandler = delegate(T result) { 308 resultHandler = result => chained.Resolve(mapper(result)),
236 try {
237 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
238 chained.Resolve(mapper(result));
239 } catch (Exception e) {
240 chained.Reject(e);
241 }
242 },
243 errorHandler = delegate(Exception e) { 309 errorHandler = delegate(Exception e) {
244 if (error != null) 310 if (error != null)
245 error(e); 311 try {
312 error(e);
313 } catch { }
246 // в случае ошибки нужно передать исключение дальше по цепочке 314 // в случае ошибки нужно передать исключение дальше по цепочке
247 chained.Reject(e); 315 chained.Reject(e);
248 } 316 }
249 }); 317 });
250 318
269 337
270 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно 338 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
271 // создать посредника, к которому будут подвызяваться следующие обработчики. 339 // создать посредника, к которому будут подвызяваться следующие обработчики.
272 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы 340 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
273 // передать через него результаты работы. 341 // передать через него результаты работы.
274 Promise<TNew> medium = new Promise<TNew>(); 342 var medium = new Promise<TNew>(this, true);
275 343
276 AddHandler(new ResultHandlerInfo() { 344 AddHandler(new ResultHandlerInfo {
277 resultHandler = delegate(T result) { 345 resultHandler = delegate(T result) {
278 try { 346 if (medium.State == PromiseState.Cancelled)
279 chained(result).Then( 347 return;
280 x => medium.Resolve(x), 348
281 e => medium.Reject(e) 349 var promise = chained(result);
282 ); 350
283 } catch (Exception e) { 351 // notify chained operation that it's not needed
284 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке 352 medium.Cancelled(() => promise.Cancel());
285 medium.Reject(e); 353 promise.Then(
286 } 354 x => medium.Resolve(x),
355 e => medium.Reject(e)
356 );
287 }, 357 },
288 errorHandler = delegate(Exception e) { 358 errorHandler = delegate(Exception e) {
289 if (error != null) 359 if (error != null)
290 error(e); 360 error(e);
291 // в случае ошибки нужно передать исключение дальше по цепочке 361 // в случае ошибки нужно передать исключение дальше по цепочке
296 return medium; 366 return medium;
297 } 367 }
298 368
299 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) { 369 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
300 return Chain(chained, null); 370 return Chain(chained, null);
371 }
372
373 public Promise<T> Cancelled(Action handler) {
374 if (handler == null)
375 return this;
376 lock (m_lock) {
377 if (m_state == PromiseState.Unresolved)
378 m_cancelHandlers.AddLast(handler);
379 else if (m_state == PromiseState.Cancelled)
380 handler();
381 }
382 return this;
383 }
384
385 public void HandleCancelled(Action handler) {
386 Cancelled(handler);
301 } 387 }
302 388
303 /// <summary> 389 /// <summary>
304 /// Дожидается отложенного обещания и в случае успеха, возвращает 390 /// Дожидается отложенного обещания и в случае успеха, возвращает
305 /// его, результат, в противном случае бросает исключение. 391 /// его, результат, в противном случае бросает исключение.
320 /// </para> 406 /// </para>
321 /// </remarks> 407 /// </remarks>
322 /// <param name="timeout">Время ожидания</param> 408 /// <param name="timeout">Время ожидания</param>
323 /// <returns>Результат выполнения обещания</returns> 409 /// <returns>Результат выполнения обещания</returns>
324 public T Join(int timeout) { 410 public T Join(int timeout) {
325 ManualResetEvent evt = new ManualResetEvent(false); 411 var evt = new ManualResetEvent(false);
326 Anyway(() => evt.Set()); 412 Anyway(() => evt.Set());
413 Cancelled(() => evt.Set());
327 414
328 if (!evt.WaitOne(timeout, true)) 415 if (!evt.WaitOne(timeout, true))
329 throw new TimeoutException(); 416 throw new TimeoutException();
330 417
331 if (m_error != null) 418 switch (State) {
332 throw new TargetInvocationException(m_error); 419 case PromiseState.Resolved:
333 else 420 return m_result;
334 return m_result; 421 case PromiseState.Cancelled:
422 throw new OperationCanceledException();
423 case PromiseState.Rejected:
424 throw new TargetInvocationException(m_error);
425 default:
426 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
427 }
335 } 428 }
336 429
337 public T Join() { 430 public T Join() {
338 return Join(Timeout.Infinite); 431 return Join(Timeout.Infinite);
339 } 432 }
340 433
341 /// <summary>
342 /// Данный метод последовательно извлекает обработчики обещания и когда
343 /// их больше не осталось - ставит состояние "разрешено".
344 /// </summary>
345 /// <param name="handler">Информация об обработчике</param>
346 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
347 bool FetchNextHandler(out ResultHandlerInfo handler) {
348 handler = default(ResultHandlerInfo);
349
350 lock (this) {
351 Debug.Assert(m_state == State.Resolving);
352
353 if (m_handlersChain.Count > 0) {
354 handler = m_handlersChain.First.Value;
355 m_handlersChain.RemoveFirst();
356 return true;
357 } else {
358 m_state = State.Resolved;
359 return false;
360 }
361 }
362 }
363
364 void AddHandler(ResultHandlerInfo handler) { 434 void AddHandler(ResultHandlerInfo handler) {
365 bool invokeRequired = false; 435 bool invokeRequired = false;
366 436
367 lock (this) { 437 lock (m_lock) {
368 if (m_state != State.Resolved) 438 m_childrenCount++;
369 m_handlersChain.AddLast(handler); 439 if (m_state == PromiseState.Unresolved) {
370 else 440 m_resultHandlers.AddLast(handler);
441 } else
371 invokeRequired = true; 442 invokeRequired = true;
372 } 443 }
373 444
374 // обработчики не должны блокировать сам объект 445 // обработчики не должны блокировать сам объект
375 if (invokeRequired) 446 if (invokeRequired)
376 InvokeHandler(handler); 447 InvokeHandler(handler);
377 } 448 }
378 449
379 void InvokeHandler(ResultHandlerInfo handler) { 450 void InvokeHandler(ResultHandlerInfo handler) {
380 if (m_error == null) { 451 switch (m_state) {
381 try { 452 case PromiseState.Resolved:
382 if (handler.resultHandler != null) 453 try {
383 handler.resultHandler(m_result); 454 if (handler.resultHandler != null)
384 } catch { } 455 handler.resultHandler(m_result);
385 } 456 } catch (Exception e) {
386 457 try {
387 if (m_error != null) { 458 if (handler.errorHandler != null)
388 try { 459 handler.errorHandler(e);
389 if (handler.errorHandler != null) 460 } catch { }
390 handler.errorHandler(m_error); 461 }
391 } catch { } 462 break;
392 } 463 case PromiseState.Rejected:
393 } 464 try {
394 465 if (handler.errorHandler != null)
466 handler.errorHandler(m_error);
467 } catch { }
468 break;
469 default:
470 // do nothing
471 return;
472 }
473 }
474
475 protected virtual void OnStateChanged() {
476 switch (m_state) {
477 case PromiseState.Resolved:
478 foreach (var resultHandlerInfo in m_resultHandlers)
479 try {
480 if (resultHandlerInfo.resultHandler != null)
481 resultHandlerInfo.resultHandler(m_result);
482 } catch (Exception e) {
483 try {
484 if (resultHandlerInfo.errorHandler != null)
485 resultHandlerInfo.errorHandler(e);
486 } catch { }
487 }
488 break;
489 case PromiseState.Cancelled:
490 foreach (var cancelHandler in m_cancelHandlers)
491 cancelHandler();
492 break;
493 case PromiseState.Rejected:
494 foreach (var resultHandlerInfo in m_resultHandlers)
495 try {
496 if (resultHandlerInfo.errorHandler != null)
497 resultHandlerInfo.errorHandler(m_error);
498 } catch { }
499 break;
500 default:
501 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
502 }
503
504 m_resultHandlers = null;
505 m_cancelHandlers = null;
506 }
507
508
509
510 public bool IsExclusive {
511 get {
512 lock (m_lock) {
513 return m_childrenCount <= 1;
514 }
515 }
516 }
517
518 public PromiseState State {
519 get {
520 lock (m_lock) {
521 return m_state;
522 }
523 }
524 }
525
526 protected bool Cancel(bool dependencies) {
527 bool result;
528
529 lock (m_lock) {
530 if (m_state == PromiseState.Unresolved) {
531 m_state = PromiseState.Cancelled;
532 result = true;
533 } else {
534 result = false;
535 }
536 }
537
538 if (result)
539 OnStateChanged();
540
541 if (dependencies && m_parent != null && m_parent.IsExclusive) {
542 m_parent.Cancel();
543 }
544
545 return result;
546 }
395 547
396 } 548 }
397 } 549 }