comparison Implab/Promise.cs @ 10:aa33d0bb8c0c promises

implemeted new cancellable promises concept
author cin
date Sun, 03 Nov 2013 18:07:38 +0400
parents c82e0dfbb4dd
children 6ec82bf68c8e
comparison
equal deleted inserted replaced
9:c82e0dfbb4dd 10:aa33d0bb8c0c
1 using System; 1 using System;
2 using System.Collections.Generic; 2 using System.Collections.Generic;
3 using System.Linq;
4 using System.Reflection; 3 using System.Reflection;
5 using System.Text;
6 using System.Diagnostics; 4 using System.Diagnostics;
7 using System.Threading; 5 using System.Threading;
8 6
9 namespace Implab { 7 namespace Implab {
10 8
11 public delegate void ErrorHandler(Exception e); 9 public delegate void ErrorHandler(Exception e);
12 10
13 public delegate void ResultHandler<T>(T result); 11 public delegate void ResultHandler<in T>(T result);
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result); 12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result);
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result); 13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result);
16 14
17 /// <summary> 15 /// <summary>
18 /// Класс для асинхронного получения результатов. Так называемое "обещание". 16 /// Класс для асинхронного получения результатов. Так называемое "обещание".
19 /// </summary> 17 /// </summary>
20 /// <typeparam name="T">Тип получаемого результата</typeparam> 18 /// <typeparam name="T">Тип получаемого результата</typeparam>
53 struct ResultHandlerInfo { 51 struct ResultHandlerInfo {
54 public ResultHandler<T> resultHandler; 52 public ResultHandler<T> resultHandler;
55 public ErrorHandler errorHandler; 53 public ErrorHandler errorHandler;
56 } 54 }
57 55
58 IPromise m_parent; 56 readonly IPromise m_parent;
59 57
60 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>(); 58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>();
61 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>(); 59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>();
62 60
63 object m_lock = new Object(); 61 readonly object m_lock = new Object();
64 bool m_cancellable; 62 readonly bool m_cancellable;
63 int m_childrenCount = 0;
65 64
66 PromiseState m_state; 65 PromiseState m_state;
67 T m_result; 66 T m_result;
68 Exception m_error; 67 Exception m_error;
69 68
70 int m_childrenCount;
71
72 public Promise() { 69 public Promise() {
73 m_cancellable = true; 70 m_cancellable = true;
74 } 71 }
75 72
76 public Promise(IPromise parent, bool cancellable) { 73 public Promise(IPromise parent, bool cancellable) {
77 m_cancellable = cancellable; 74 m_cancellable = cancellable;
78 m_parent = parent; 75 m_parent = parent;
79 } 76 if (parent != null)
80 77 parent.HandleCancelled(InternalCancel);
81 /// <summary> 78 }
82 /// Событие, возникающее при отмене асинхронной операции. 79
83 /// </summary> 80 void InternalCancel() {
84 /// <description> 81 // don't try to cancel parent :)
85 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. 82 Cancel(false);
86 /// </description> 83 }
87 public event EventHandler Cancelled;
88 84
89 /// <summary> 85 /// <summary>
90 /// Выполняет обещание, сообщая об успешном выполнении. 86 /// Выполняет обещание, сообщая об успешном выполнении.
91 /// </summary> 87 /// </summary>
92 /// <param name="result">Результат выполнения.</param> 88 /// <param name="result">Результат выполнения.</param>
99 throw new InvalidOperationException("The promise is already resolved"); 95 throw new InvalidOperationException("The promise is already resolved");
100 m_result = result; 96 m_result = result;
101 m_state = PromiseState.Resolved; 97 m_state = PromiseState.Resolved;
102 } 98 }
103 99
104 // state has been changed to rejected new handlers can't be added 100 OnStateChanged();
105
106 foreach (var handler in m_resultHandlers)
107 InvokeHandler(handler);
108
109 /* ResultHandlerInfo handler;
110 while (FetchNextHandler(out handler))
111 InvokeHandler(handler); */
112 } 101 }
113 102
114 /// <summary> 103 /// <summary>
115 /// Выполняет обещание, сообщая об ошибке 104 /// Выполняет обещание, сообщая об ошибке
116 /// </summary> 105 /// </summary>
124 throw new InvalidOperationException("The promise is already resolved"); 113 throw new InvalidOperationException("The promise is already resolved");
125 m_error = error; 114 m_error = error;
126 m_state = PromiseState.Rejected; 115 m_state = PromiseState.Rejected;
127 } 116 }
128 117
129 // state has been changed to rejected new handlers can't be added 118 OnStateChanged();
130
131 foreach (var handler in m_resultHandlers)
132 InvokeHandler(handler);
133
134 /*ResultHandlerInfo handler;
135 while (FetchNextHandler(out handler))
136 InvokeHandler(handler);*/
137 } 119 }
138 120
139 /// <summary> 121 /// <summary>
140 /// Отменяет операцию, если это возможно. 122 /// Отменяет операцию, если это возможно.
141 /// </summary> 123 /// </summary>
142 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns> 124 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns>
143 public bool Cancel() { 125 public bool Cancel() {
144 return Cancel(true); 126 return Cancel(true);
127 }
128
129 protected virtual void OnStateChanged() {
130 switch (m_state) {
131 case PromiseState.Resolved:
132 foreach (var resultHandlerInfo in m_resultHandlers)
133 try {
134 if (resultHandlerInfo.resultHandler != null)
135 resultHandlerInfo.resultHandler(m_result);
136 } catch (Exception e) {
137 try {
138 if (resultHandlerInfo.errorHandler != null)
139 resultHandlerInfo.errorHandler(e);
140 } catch { }
141 }
142 break;
143 case PromiseState.Cancelled:
144 foreach (var cancelHandler in m_cancelHandlers)
145 cancelHandler();
146 break;
147 case PromiseState.Rejected:
148 foreach (var resultHandlerInfo in m_resultHandlers)
149 try {
150 if (resultHandlerInfo.errorHandler != null)
151 resultHandlerInfo.errorHandler(m_error);
152 } catch { }
153 break;
154 default:
155 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state));
156 }
157
158 m_resultHandlers = null;
159 m_cancelHandlers = null;
145 } 160 }
146 161
147 /// <summary> 162 /// <summary>
148 /// Добавляет обработчики событий выполнения обещания. 163 /// Добавляет обработчики событий выполнения обещания.
149 /// </summary> 164 /// </summary>
160 175
161 var handlerInfo = new ResultHandlerInfo(); 176 var handlerInfo = new ResultHandlerInfo();
162 177
163 if (success != null) 178 if (success != null)
164 handlerInfo.resultHandler = x => { 179 handlerInfo.resultHandler = x => {
165 try { 180 success(x);
166 success(x); 181 medium.Resolve(x);
167 medium.Resolve(x);
168 } catch (Exception e) {
169 medium.Reject(e);
170 }
171 }; 182 };
172 else 183 else
173 handlerInfo.resultHandler = x => medium.Resolve(x); 184 handlerInfo.resultHandler = medium.Resolve;
174 185
175 if (error != null) 186 if (error != null)
176 handlerInfo.errorHandler = x => { 187 handlerInfo.errorHandler = x => {
177 try { 188 try {
178 error(x); 189 error(x);
179 } catch { } 190 } catch { }
180 medium.Reject(x); 191 medium.Reject(x);
181 }; 192 };
182 else 193 else
183 handlerInfo.errorHandler = x => medium.Reject(x); 194 handlerInfo.errorHandler = medium.Reject;
184 195
185 AddHandler(handlerInfo); 196 AddHandler(handlerInfo);
186 197
187 return medium; 198 return medium;
188 } 199 }
201 212
202 var medium = new Promise<T>(); 213 var medium = new Promise<T>();
203 214
204 AddHandler(new ResultHandlerInfo { 215 AddHandler(new ResultHandlerInfo {
205 resultHandler = x => { 216 resultHandler = x => {
217 // to avoid handler being called multiple times we handle exception by ourselfs
206 try { 218 try {
207 handler(); 219 handler();
208 medium.Resolve(x); 220 medium.Resolve(x);
209 } catch (Exception e) { 221 } catch (Exception e) {
210 medium.Reject(e); 222 medium.Reject(e);
232 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) { 244 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) {
233 if (mapper == null) 245 if (mapper == null)
234 throw new ArgumentNullException("mapper"); 246 throw new ArgumentNullException("mapper");
235 247
236 // создаем прицепленное обещание 248 // создаем прицепленное обещание
237 Promise<TNew> chained = new Promise<TNew>(); 249 var chained = new Promise<TNew>();
238 250
239 AddHandler(new ResultHandlerInfo() { 251 AddHandler(new ResultHandlerInfo() {
240 resultHandler = delegate(T result) { 252 resultHandler = result => chained.Resolve(mapper(result)),
241 try {
242 // если преобразование выдаст исключение, то сработает reject сцепленного deferred
243 chained.Resolve(mapper(result));
244 } catch (Exception e) {
245 chained.Reject(e);
246 }
247 },
248 errorHandler = delegate(Exception e) { 253 errorHandler = delegate(Exception e) {
249 if (error != null) 254 if (error != null)
250 error(e); 255 try {
256 error(e);
257 } catch { }
251 // в случае ошибки нужно передать исключение дальше по цепочке 258 // в случае ошибки нужно передать исключение дальше по цепочке
252 chained.Reject(e); 259 chained.Reject(e);
253 } 260 }
254 }); 261 });
255 262
274 281
275 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно 282 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно
276 // создать посредника, к которому будут подвызяваться следующие обработчики. 283 // создать посредника, к которому будут подвызяваться следующие обработчики.
277 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы 284 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы
278 // передать через него результаты работы. 285 // передать через него результаты работы.
279 Promise<TNew> medium = new Promise<TNew>(); 286 var medium = new Promise<TNew>(this, true);
280 287
281 AddHandler(new ResultHandlerInfo() { 288 AddHandler(new ResultHandlerInfo {
282 resultHandler = delegate(T result) { 289 resultHandler = delegate(T result) {
283 try { 290 if (medium.State == PromiseState.Cancelled)
284 chained(result).Then( 291 return;
285 x => medium.Resolve(x), 292
286 e => medium.Reject(e) 293 var promise = chained(result);
287 ); 294
288 } catch (Exception e) { 295 // notify chained operation that it's not needed
289 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке 296 medium.Cancelled(() => promise.Cancel());
290 medium.Reject(e); 297 promise.Then(
291 } 298 medium.Resolve,
299 medium.Reject
300 );
292 }, 301 },
293 errorHandler = delegate(Exception e) { 302 errorHandler = delegate(Exception e) {
294 if (error != null) 303 if (error != null)
295 error(e); 304 error(e);
296 // в случае ошибки нужно передать исключение дальше по цепочке 305 // в случае ошибки нужно передать исключение дальше по цепочке
301 return medium; 310 return medium;
302 } 311 }
303 312
304 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) { 313 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) {
305 return Chain(chained, null); 314 return Chain(chained, null);
315 }
316
317 public Promise<T> Cancelled(Action handler) {
318 if (handler == null)
319 return this;
320 lock (m_lock) {
321 if (m_state == PromiseState.Unresolved)
322 m_cancelHandlers.AddLast(handler);
323 else if (m_state == PromiseState.Cancelled)
324 handler();
325 }
326 return this;
327 }
328
329 public void HandleCancelled(Action handler) {
330 Cancelled(handler);
306 } 331 }
307 332
308 /// <summary> 333 /// <summary>
309 /// Дожидается отложенного обещания и в случае успеха, возвращает 334 /// Дожидается отложенного обещания и в случае успеха, возвращает
310 /// его, результат, в противном случае бросает исключение. 335 /// его, результат, в противном случае бросает исключение.
325 /// </para> 350 /// </para>
326 /// </remarks> 351 /// </remarks>
327 /// <param name="timeout">Время ожидания</param> 352 /// <param name="timeout">Время ожидания</param>
328 /// <returns>Результат выполнения обещания</returns> 353 /// <returns>Результат выполнения обещания</returns>
329 public T Join(int timeout) { 354 public T Join(int timeout) {
330 ManualResetEvent evt = new ManualResetEvent(false); 355 var evt = new ManualResetEvent(false);
331 Anyway(() => evt.Set()); 356 Anyway(() => evt.Set());
357 Cancelled(() => evt.Set());
332 358
333 if (!evt.WaitOne(timeout, true)) 359 if (!evt.WaitOne(timeout, true))
334 throw new TimeoutException(); 360 throw new TimeoutException();
335 361
336 if (m_error != null) 362 switch (State) {
337 throw new TargetInvocationException(m_error); 363 case PromiseState.Resolved:
338 else 364 return m_result;
339 return m_result; 365 case PromiseState.Cancelled:
366 throw new OperationCanceledException();
367 case PromiseState.Rejected:
368 throw new TargetInvocationException(m_error);
369 default:
370 throw new ApplicationException(String.Format("Invalid promise state {0}", State));
371 }
340 } 372 }
341 373
342 public T Join() { 374 public T Join() {
343 return Join(Timeout.Infinite); 375 return Join(Timeout.Infinite);
344 } 376 }
345 377
346 /// <summary>
347 /// Данный метод последовательно извлекает обработчики обещания и когда
348 /// их больше не осталось - ставит состояние "разрешено".
349 /// </summary>
350 /// <param name="handler">Информация об обработчике</param>
351 /// <returns>Признак того, что еще остались обработчики в очереди</returns>
352 bool FetchNextHandler(out ResultHandlerInfo handler) {
353 handler = default(ResultHandlerInfo);
354
355 lock (this) {
356 Debug.Assert(m_state != PromiseState.Unresolved);
357
358 if (m_resultHandlers.Count > 0) {
359 handler = m_resultHandlers.First.Value;
360 m_resultHandlers.RemoveFirst();
361 return true;
362 } else {
363 return false;
364 }
365 }
366 }
367
368 void AddHandler(ResultHandlerInfo handler) { 378 void AddHandler(ResultHandlerInfo handler) {
369 bool invokeRequired = false; 379 bool invokeRequired = false;
370 380
371 lock (this) { 381 lock (m_lock) {
372 if (m_state == PromiseState.Unresolved) 382 m_childrenCount++;
383 if (m_state == PromiseState.Unresolved) {
373 m_resultHandlers.AddLast(handler); 384 m_resultHandlers.AddLast(handler);
374 else 385 } else
375 invokeRequired = true; 386 invokeRequired = true;
376 } 387 }
377 388
378 // обработчики не должны блокировать сам объект 389 // обработчики не должны блокировать сам объект
379 if (invokeRequired) 390 if (invokeRequired)
380 InvokeHandler(handler); 391 InvokeHandler(handler);
381 } 392 }
382 393
383 void InvokeHandler(ResultHandlerInfo handler) { 394 void InvokeHandler(ResultHandlerInfo handler) {
384 if (m_error == null) { 395 switch (m_state) {
385 try { 396 case PromiseState.Resolved:
386 if (handler.resultHandler != null) 397 try {
387 handler.resultHandler(m_result); 398 if (handler.resultHandler != null)
388 } catch { } 399 handler.resultHandler(m_result);
389 } 400 } catch (Exception e) {
390 401 try {
391 if (m_error != null) { 402 if (handler.errorHandler != null)
392 try { 403 handler.errorHandler(e);
393 if (handler.errorHandler != null) 404 } catch { }
394 handler.errorHandler(m_error); 405 }
395 } catch { } 406 break;
407 case PromiseState.Rejected:
408 try {
409 if (handler.errorHandler != null)
410 handler.errorHandler(m_error);
411 } catch { }
412 break;
413 default:
414 // do nothing
415 return;
396 } 416 }
397 } 417 }
398 418
399 419
400 420
424 } else { 444 } else {
425 result = false; 445 result = false;
426 } 446 }
427 } 447 }
428 448
449 if (result)
450 OnStateChanged();
451
429 if (dependencies && m_parent != null && m_parent.IsExclusive) { 452 if (dependencies && m_parent != null && m_parent.IsExclusive) {
430 // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive)
431 m_parent.Cancel(true); 453 m_parent.Cancel(true);
432 }
433
434 if (result) {
435 // state has been changed to cancelled, new handlers can't be added
436 foreach (var handler in m_cancelHandlers)
437 handler();
438 } 454 }
439 455
440 return result; 456 return result;
441 } 457 }
442 } 458 }