Mercurial > pub > ImplabNet
comparison Implab/Promise.cs @ 10:aa33d0bb8c0c promises
implemeted new cancellable promises concept
author | cin |
---|---|
date | Sun, 03 Nov 2013 18:07:38 +0400 |
parents | c82e0dfbb4dd |
children | 6ec82bf68c8e |
comparison
equal
deleted
inserted
replaced
9:c82e0dfbb4dd | 10:aa33d0bb8c0c |
---|---|
1 using System; | 1 using System; |
2 using System.Collections.Generic; | 2 using System.Collections.Generic; |
3 using System.Linq; | |
4 using System.Reflection; | 3 using System.Reflection; |
5 using System.Text; | |
6 using System.Diagnostics; | 4 using System.Diagnostics; |
7 using System.Threading; | 5 using System.Threading; |
8 | 6 |
9 namespace Implab { | 7 namespace Implab { |
10 | 8 |
11 public delegate void ErrorHandler(Exception e); | 9 public delegate void ErrorHandler(Exception e); |
12 | 10 |
13 public delegate void ResultHandler<T>(T result); | 11 public delegate void ResultHandler<in T>(T result); |
14 public delegate TNew ResultMapper<TSrc, TNew>(TSrc result); | 12 public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result); |
15 public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result); | 13 public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result); |
16 | 14 |
17 /// <summary> | 15 /// <summary> |
18 /// Класс для асинхронного получения результатов. Так называемое "обещание". | 16 /// Класс для асинхронного получения результатов. Так называемое "обещание". |
19 /// </summary> | 17 /// </summary> |
20 /// <typeparam name="T">Тип получаемого результата</typeparam> | 18 /// <typeparam name="T">Тип получаемого результата</typeparam> |
53 struct ResultHandlerInfo { | 51 struct ResultHandlerInfo { |
54 public ResultHandler<T> resultHandler; | 52 public ResultHandler<T> resultHandler; |
55 public ErrorHandler errorHandler; | 53 public ErrorHandler errorHandler; |
56 } | 54 } |
57 | 55 |
58 IPromise m_parent; | 56 readonly IPromise m_parent; |
59 | 57 |
60 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>(); | 58 LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>(); |
61 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>(); | 59 LinkedList<Action> m_cancelHandlers = new LinkedList<Action>(); |
62 | 60 |
63 object m_lock = new Object(); | 61 readonly object m_lock = new Object(); |
64 bool m_cancellable; | 62 readonly bool m_cancellable; |
63 int m_childrenCount = 0; | |
65 | 64 |
66 PromiseState m_state; | 65 PromiseState m_state; |
67 T m_result; | 66 T m_result; |
68 Exception m_error; | 67 Exception m_error; |
69 | 68 |
70 int m_childrenCount; | |
71 | |
72 public Promise() { | 69 public Promise() { |
73 m_cancellable = true; | 70 m_cancellable = true; |
74 } | 71 } |
75 | 72 |
76 public Promise(IPromise parent, bool cancellable) { | 73 public Promise(IPromise parent, bool cancellable) { |
77 m_cancellable = cancellable; | 74 m_cancellable = cancellable; |
78 m_parent = parent; | 75 m_parent = parent; |
79 } | 76 if (parent != null) |
80 | 77 parent.HandleCancelled(InternalCancel); |
81 /// <summary> | 78 } |
82 /// Событие, возникающее при отмене асинхронной операции. | 79 |
83 /// </summary> | 80 void InternalCancel() { |
84 /// <description> | 81 // don't try to cancel parent :) |
85 /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. | 82 Cancel(false); |
86 /// </description> | 83 } |
87 public event EventHandler Cancelled; | |
88 | 84 |
89 /// <summary> | 85 /// <summary> |
90 /// Выполняет обещание, сообщая об успешном выполнении. | 86 /// Выполняет обещание, сообщая об успешном выполнении. |
91 /// </summary> | 87 /// </summary> |
92 /// <param name="result">Результат выполнения.</param> | 88 /// <param name="result">Результат выполнения.</param> |
99 throw new InvalidOperationException("The promise is already resolved"); | 95 throw new InvalidOperationException("The promise is already resolved"); |
100 m_result = result; | 96 m_result = result; |
101 m_state = PromiseState.Resolved; | 97 m_state = PromiseState.Resolved; |
102 } | 98 } |
103 | 99 |
104 // state has been changed to rejected new handlers can't be added | 100 OnStateChanged(); |
105 | |
106 foreach (var handler in m_resultHandlers) | |
107 InvokeHandler(handler); | |
108 | |
109 /* ResultHandlerInfo handler; | |
110 while (FetchNextHandler(out handler)) | |
111 InvokeHandler(handler); */ | |
112 } | 101 } |
113 | 102 |
114 /// <summary> | 103 /// <summary> |
115 /// Выполняет обещание, сообщая об ошибке | 104 /// Выполняет обещание, сообщая об ошибке |
116 /// </summary> | 105 /// </summary> |
124 throw new InvalidOperationException("The promise is already resolved"); | 113 throw new InvalidOperationException("The promise is already resolved"); |
125 m_error = error; | 114 m_error = error; |
126 m_state = PromiseState.Rejected; | 115 m_state = PromiseState.Rejected; |
127 } | 116 } |
128 | 117 |
129 // state has been changed to rejected new handlers can't be added | 118 OnStateChanged(); |
130 | |
131 foreach (var handler in m_resultHandlers) | |
132 InvokeHandler(handler); | |
133 | |
134 /*ResultHandlerInfo handler; | |
135 while (FetchNextHandler(out handler)) | |
136 InvokeHandler(handler);*/ | |
137 } | 119 } |
138 | 120 |
139 /// <summary> | 121 /// <summary> |
140 /// Отменяет операцию, если это возможно. | 122 /// Отменяет операцию, если это возможно. |
141 /// </summary> | 123 /// </summary> |
142 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns> | 124 /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns> |
143 public bool Cancel() { | 125 public bool Cancel() { |
144 return Cancel(true); | 126 return Cancel(true); |
127 } | |
128 | |
129 protected virtual void OnStateChanged() { | |
130 switch (m_state) { | |
131 case PromiseState.Resolved: | |
132 foreach (var resultHandlerInfo in m_resultHandlers) | |
133 try { | |
134 if (resultHandlerInfo.resultHandler != null) | |
135 resultHandlerInfo.resultHandler(m_result); | |
136 } catch (Exception e) { | |
137 try { | |
138 if (resultHandlerInfo.errorHandler != null) | |
139 resultHandlerInfo.errorHandler(e); | |
140 } catch { } | |
141 } | |
142 break; | |
143 case PromiseState.Cancelled: | |
144 foreach (var cancelHandler in m_cancelHandlers) | |
145 cancelHandler(); | |
146 break; | |
147 case PromiseState.Rejected: | |
148 foreach (var resultHandlerInfo in m_resultHandlers) | |
149 try { | |
150 if (resultHandlerInfo.errorHandler != null) | |
151 resultHandlerInfo.errorHandler(m_error); | |
152 } catch { } | |
153 break; | |
154 default: | |
155 throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state)); | |
156 } | |
157 | |
158 m_resultHandlers = null; | |
159 m_cancelHandlers = null; | |
145 } | 160 } |
146 | 161 |
147 /// <summary> | 162 /// <summary> |
148 /// Добавляет обработчики событий выполнения обещания. | 163 /// Добавляет обработчики событий выполнения обещания. |
149 /// </summary> | 164 /// </summary> |
160 | 175 |
161 var handlerInfo = new ResultHandlerInfo(); | 176 var handlerInfo = new ResultHandlerInfo(); |
162 | 177 |
163 if (success != null) | 178 if (success != null) |
164 handlerInfo.resultHandler = x => { | 179 handlerInfo.resultHandler = x => { |
165 try { | 180 success(x); |
166 success(x); | 181 medium.Resolve(x); |
167 medium.Resolve(x); | |
168 } catch (Exception e) { | |
169 medium.Reject(e); | |
170 } | |
171 }; | 182 }; |
172 else | 183 else |
173 handlerInfo.resultHandler = x => medium.Resolve(x); | 184 handlerInfo.resultHandler = medium.Resolve; |
174 | 185 |
175 if (error != null) | 186 if (error != null) |
176 handlerInfo.errorHandler = x => { | 187 handlerInfo.errorHandler = x => { |
177 try { | 188 try { |
178 error(x); | 189 error(x); |
179 } catch { } | 190 } catch { } |
180 medium.Reject(x); | 191 medium.Reject(x); |
181 }; | 192 }; |
182 else | 193 else |
183 handlerInfo.errorHandler = x => medium.Reject(x); | 194 handlerInfo.errorHandler = medium.Reject; |
184 | 195 |
185 AddHandler(handlerInfo); | 196 AddHandler(handlerInfo); |
186 | 197 |
187 return medium; | 198 return medium; |
188 } | 199 } |
201 | 212 |
202 var medium = new Promise<T>(); | 213 var medium = new Promise<T>(); |
203 | 214 |
204 AddHandler(new ResultHandlerInfo { | 215 AddHandler(new ResultHandlerInfo { |
205 resultHandler = x => { | 216 resultHandler = x => { |
217 // to avoid handler being called multiple times we handle exception by ourselfs | |
206 try { | 218 try { |
207 handler(); | 219 handler(); |
208 medium.Resolve(x); | 220 medium.Resolve(x); |
209 } catch (Exception e) { | 221 } catch (Exception e) { |
210 medium.Reject(e); | 222 medium.Reject(e); |
232 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) { | 244 public Promise<TNew> Map<TNew>(ResultMapper<T, TNew> mapper, ErrorHandler error) { |
233 if (mapper == null) | 245 if (mapper == null) |
234 throw new ArgumentNullException("mapper"); | 246 throw new ArgumentNullException("mapper"); |
235 | 247 |
236 // создаем прицепленное обещание | 248 // создаем прицепленное обещание |
237 Promise<TNew> chained = new Promise<TNew>(); | 249 var chained = new Promise<TNew>(); |
238 | 250 |
239 AddHandler(new ResultHandlerInfo() { | 251 AddHandler(new ResultHandlerInfo() { |
240 resultHandler = delegate(T result) { | 252 resultHandler = result => chained.Resolve(mapper(result)), |
241 try { | |
242 // если преобразование выдаст исключение, то сработает reject сцепленного deferred | |
243 chained.Resolve(mapper(result)); | |
244 } catch (Exception e) { | |
245 chained.Reject(e); | |
246 } | |
247 }, | |
248 errorHandler = delegate(Exception e) { | 253 errorHandler = delegate(Exception e) { |
249 if (error != null) | 254 if (error != null) |
250 error(e); | 255 try { |
256 error(e); | |
257 } catch { } | |
251 // в случае ошибки нужно передать исключение дальше по цепочке | 258 // в случае ошибки нужно передать исключение дальше по цепочке |
252 chained.Reject(e); | 259 chained.Reject(e); |
253 } | 260 } |
254 }); | 261 }); |
255 | 262 |
274 | 281 |
275 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно | 282 // проблема в том, что на момент связывания еще не начата асинхронная операция, поэтому нужно |
276 // создать посредника, к которому будут подвызяваться следующие обработчики. | 283 // создать посредника, к которому будут подвызяваться следующие обработчики. |
277 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы | 284 // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы |
278 // передать через него результаты работы. | 285 // передать через него результаты работы. |
279 Promise<TNew> medium = new Promise<TNew>(); | 286 var medium = new Promise<TNew>(this, true); |
280 | 287 |
281 AddHandler(new ResultHandlerInfo() { | 288 AddHandler(new ResultHandlerInfo { |
282 resultHandler = delegate(T result) { | 289 resultHandler = delegate(T result) { |
283 try { | 290 if (medium.State == PromiseState.Cancelled) |
284 chained(result).Then( | 291 return; |
285 x => medium.Resolve(x), | 292 |
286 e => medium.Reject(e) | 293 var promise = chained(result); |
287 ); | 294 |
288 } catch (Exception e) { | 295 // notify chained operation that it's not needed |
289 // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке | 296 medium.Cancelled(() => promise.Cancel()); |
290 medium.Reject(e); | 297 promise.Then( |
291 } | 298 medium.Resolve, |
299 medium.Reject | |
300 ); | |
292 }, | 301 }, |
293 errorHandler = delegate(Exception e) { | 302 errorHandler = delegate(Exception e) { |
294 if (error != null) | 303 if (error != null) |
295 error(e); | 304 error(e); |
296 // в случае ошибки нужно передать исключение дальше по цепочке | 305 // в случае ошибки нужно передать исключение дальше по цепочке |
301 return medium; | 310 return medium; |
302 } | 311 } |
303 | 312 |
304 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) { | 313 public Promise<TNew> Chain<TNew>(ChainedOperation<T, TNew> chained) { |
305 return Chain(chained, null); | 314 return Chain(chained, null); |
315 } | |
316 | |
317 public Promise<T> Cancelled(Action handler) { | |
318 if (handler == null) | |
319 return this; | |
320 lock (m_lock) { | |
321 if (m_state == PromiseState.Unresolved) | |
322 m_cancelHandlers.AddLast(handler); | |
323 else if (m_state == PromiseState.Cancelled) | |
324 handler(); | |
325 } | |
326 return this; | |
327 } | |
328 | |
329 public void HandleCancelled(Action handler) { | |
330 Cancelled(handler); | |
306 } | 331 } |
307 | 332 |
308 /// <summary> | 333 /// <summary> |
309 /// Дожидается отложенного обещания и в случае успеха, возвращает | 334 /// Дожидается отложенного обещания и в случае успеха, возвращает |
310 /// его, результат, в противном случае бросает исключение. | 335 /// его, результат, в противном случае бросает исключение. |
325 /// </para> | 350 /// </para> |
326 /// </remarks> | 351 /// </remarks> |
327 /// <param name="timeout">Время ожидания</param> | 352 /// <param name="timeout">Время ожидания</param> |
328 /// <returns>Результат выполнения обещания</returns> | 353 /// <returns>Результат выполнения обещания</returns> |
329 public T Join(int timeout) { | 354 public T Join(int timeout) { |
330 ManualResetEvent evt = new ManualResetEvent(false); | 355 var evt = new ManualResetEvent(false); |
331 Anyway(() => evt.Set()); | 356 Anyway(() => evt.Set()); |
357 Cancelled(() => evt.Set()); | |
332 | 358 |
333 if (!evt.WaitOne(timeout, true)) | 359 if (!evt.WaitOne(timeout, true)) |
334 throw new TimeoutException(); | 360 throw new TimeoutException(); |
335 | 361 |
336 if (m_error != null) | 362 switch (State) { |
337 throw new TargetInvocationException(m_error); | 363 case PromiseState.Resolved: |
338 else | 364 return m_result; |
339 return m_result; | 365 case PromiseState.Cancelled: |
366 throw new OperationCanceledException(); | |
367 case PromiseState.Rejected: | |
368 throw new TargetInvocationException(m_error); | |
369 default: | |
370 throw new ApplicationException(String.Format("Invalid promise state {0}", State)); | |
371 } | |
340 } | 372 } |
341 | 373 |
342 public T Join() { | 374 public T Join() { |
343 return Join(Timeout.Infinite); | 375 return Join(Timeout.Infinite); |
344 } | 376 } |
345 | 377 |
346 /// <summary> | |
347 /// Данный метод последовательно извлекает обработчики обещания и когда | |
348 /// их больше не осталось - ставит состояние "разрешено". | |
349 /// </summary> | |
350 /// <param name="handler">Информация об обработчике</param> | |
351 /// <returns>Признак того, что еще остались обработчики в очереди</returns> | |
352 bool FetchNextHandler(out ResultHandlerInfo handler) { | |
353 handler = default(ResultHandlerInfo); | |
354 | |
355 lock (this) { | |
356 Debug.Assert(m_state != PromiseState.Unresolved); | |
357 | |
358 if (m_resultHandlers.Count > 0) { | |
359 handler = m_resultHandlers.First.Value; | |
360 m_resultHandlers.RemoveFirst(); | |
361 return true; | |
362 } else { | |
363 return false; | |
364 } | |
365 } | |
366 } | |
367 | |
368 void AddHandler(ResultHandlerInfo handler) { | 378 void AddHandler(ResultHandlerInfo handler) { |
369 bool invokeRequired = false; | 379 bool invokeRequired = false; |
370 | 380 |
371 lock (this) { | 381 lock (m_lock) { |
372 if (m_state == PromiseState.Unresolved) | 382 m_childrenCount++; |
383 if (m_state == PromiseState.Unresolved) { | |
373 m_resultHandlers.AddLast(handler); | 384 m_resultHandlers.AddLast(handler); |
374 else | 385 } else |
375 invokeRequired = true; | 386 invokeRequired = true; |
376 } | 387 } |
377 | 388 |
378 // обработчики не должны блокировать сам объект | 389 // обработчики не должны блокировать сам объект |
379 if (invokeRequired) | 390 if (invokeRequired) |
380 InvokeHandler(handler); | 391 InvokeHandler(handler); |
381 } | 392 } |
382 | 393 |
383 void InvokeHandler(ResultHandlerInfo handler) { | 394 void InvokeHandler(ResultHandlerInfo handler) { |
384 if (m_error == null) { | 395 switch (m_state) { |
385 try { | 396 case PromiseState.Resolved: |
386 if (handler.resultHandler != null) | 397 try { |
387 handler.resultHandler(m_result); | 398 if (handler.resultHandler != null) |
388 } catch { } | 399 handler.resultHandler(m_result); |
389 } | 400 } catch (Exception e) { |
390 | 401 try { |
391 if (m_error != null) { | 402 if (handler.errorHandler != null) |
392 try { | 403 handler.errorHandler(e); |
393 if (handler.errorHandler != null) | 404 } catch { } |
394 handler.errorHandler(m_error); | 405 } |
395 } catch { } | 406 break; |
407 case PromiseState.Rejected: | |
408 try { | |
409 if (handler.errorHandler != null) | |
410 handler.errorHandler(m_error); | |
411 } catch { } | |
412 break; | |
413 default: | |
414 // do nothing | |
415 return; | |
396 } | 416 } |
397 } | 417 } |
398 | 418 |
399 | 419 |
400 | 420 |
424 } else { | 444 } else { |
425 result = false; | 445 result = false; |
426 } | 446 } |
427 } | 447 } |
428 | 448 |
449 if (result) | |
450 OnStateChanged(); | |
451 | |
429 if (dependencies && m_parent != null && m_parent.IsExclusive) { | 452 if (dependencies && m_parent != null && m_parent.IsExclusive) { |
430 // TODO syncronize IsExclusive, AddHandler, Cancel (maybe CancelExclusive) | |
431 m_parent.Cancel(true); | 453 m_parent.Cancel(true); |
432 } | |
433 | |
434 if (result) { | |
435 // state has been changed to cancelled, new handlers can't be added | |
436 foreach (var handler in m_cancelHandlers) | |
437 handler(); | |
438 } | 454 } |
439 | 455 |
440 return result; | 456 return result; |
441 } | 457 } |
442 } | 458 } |