comparison Implab/AbstractPromise.cs @ 144:8c0b95069066 v2

DRAFT: refactoring
author cin
date Fri, 06 Mar 2015 15:45:26 +0300
parents 16f926ee499d
children 706fccb85524
comparison
equal deleted inserted replaced
143:16f926ee499d 144:8c0b95069066
1 using System; 1 using System;
2 using Implab.Parallels; 2 using Implab.Parallels;
3 using System.Threading;
4 using System.Reflection;
5 3
6 namespace Implab { 4 namespace Implab {
7 public abstract class AbstractPromise<THandler> { 5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise {
6 public struct HandlerDescriptor {
7 readonly Action m_handler;
8 readonly Action<Exception> m_error;
9 readonly Action<Exception> m_cancel;
10 readonly PromiseEventType m_mask;
8 11
9 const int UNRESOLVED_SATE = 0; 12 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) {
10 const int TRANSITIONAL_STATE = 1; 13 m_handler = success;
11 const int SUCCEEDED_STATE = 2; 14 m_error = error;
12 const int REJECTED_STATE = 3; 15 m_cancel = cancel;
13 const int CANCELLED_STATE = 4; 16 m_mask = PromiseEventType.Success;
17 }
14 18
15 const int RESERVED_HANDLERS_COUNT = 4; 19 public HandlerDescriptor(Action handler, PromiseEventType mask) {
20 m_handler = handler;
21 m_mask = mask;
22 }
16 23
17 int m_state; 24 public void SignalSuccess() {
18 Exception m_error; 25 if (m_mask & PromiseEventType.Success && m_handler != null) {
19 int m_handlersCount; 26 try {
27 m_handler();
28 } catch (Exception err) {
29 // avoid calling handler twice in case of error
30 if (m_error != null)
31 SignalError(err);
32 }
33 }
34 }
20 35
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; 36 public void SignalError(Exception err) {
22 MTQueue<THandler> m_extraHandlers; 37 if (m_error != null) {
23 int m_handlerPointer = -1; 38 try {
24 int m_handlersCommited; 39 m_error(err);
40 // Analysis disable once EmptyGeneralCatchClause
41 } catch {
42 }
43 } else if (m_mask & PromiseEventType.Error && m_handler != null) {
44 try {
45 m_handler();
46 // Analysis disable once EmptyGeneralCatchClause
47 } catch {
48 }
49 }
50 }
25 51
26 #region state managment 52 public void SignalCancel(Exception reason) {
27 bool BeginTransit() { 53 if (m_cancel != null) {
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); 54 try {
29 } 55 m_cancel(reason);
30 56 } catch (Exception err) {
31 void CompleteTransit(int state) { 57 SignalError(err);
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) 58 }
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); 59 } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) {
34 } 60 try {
35 61 m_handler();
36 void WaitTransition() { 62 // Analysis disable once EmptyGeneralCatchClause
37 while (m_state == TRANSITIONAL_STATE) { 63 } catch {
38 Thread.MemoryBarrier(); 64 }
65 }
39 } 66 }
40 } 67 }
41 68
42 protected bool BeginSetResult() { 69
43 if (!BeginTransit()) { 70 #region implemented abstract members of AbstractPromise
44 WaitTransition(); 71
45 if (m_state != CANCELLED_STATE) 72 protected override void SignalSuccess(HandlerDescriptor handler) {
46 throw new InvalidOperationException("The promise is already resolved"); 73 handler.SignalSuccess();
47 return false;
48 }
49 return true;
50 } 74 }
51 75
52 protected void EndSetResult() { 76 protected override void SignalError(HandlerDescriptor handler, Exception error) {
53 CompleteTransit(SUCCEEDED_STATE); 77 handler.SignalError(error);
54 OnSuccess();
55 } 78 }
56 79
57 80 protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) {
58 81 handler.SignalCancel(reason);
59 /// <summary>
60 /// Выполняет обещание, сообщая об ошибке
61 /// </summary>
62 /// <remarks>
63 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
64 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
65 /// будут проигнорированы.
66 /// </remarks>
67 /// <param name="error">Исключение возникшее при выполнении операции</param>
68 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
69 protected void SetError(Exception error) {
70 if (BeginTransit()) {
71 if (error is OperationCanceledException) {
72 CompleteTransit(CANCELLED_STATE);
73 m_error = error.InnerException;
74 OnCancelled();
75 } else {
76 m_error = error is PromiseTransientException ? error.InnerException : error;
77 CompleteTransit(REJECTED_STATE);
78 OnError();
79 }
80 } else {
81 WaitTransition();
82 if (m_state == SUCCEEDED_STATE)
83 throw new InvalidOperationException("The promise is already resolved");
84 }
85 } 82 }
86 83
87 /// <summary> 84 protected override Signal GetResolveSignal() {
88 /// Отменяет операцию, если это возможно. 85 var signal = new Signal();
89 /// </summary> 86 On(signal.Set, PromiseEventType.All);
90 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
91 protected void SetCancelled(Exception reason) {
92 if (BeginTransit()) {
93 m_error = reason;
94 CompleteTransit(CANCELLED_STATE);
95 OnCancelled();
96 }
97 }
98
99 protected abstract void SignalSuccess(THandler handler);
100
101 protected abstract void SignalError(THandler handler, Exception error);
102
103 protected abstract void SignalCancelled(THandler handler, Exception reason);
104
105 void OnSuccess() {
106 var hp = m_handlerPointer;
107 var slot = hp +1 ;
108 while (slot < m_handlersCommited) {
109 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
110 SignalSuccess(m_handlers[slot]);
111 }
112 hp = m_handlerPointer;
113 slot = hp +1 ;
114 }
115
116
117 if (m_extraHandlers != null) {
118 THandler handler;
119 while (m_extraHandlers.TryDequeue(out handler))
120 SignalSuccess(handler);
121 }
122 }
123
124 void OnError() {
125 var hp = m_handlerPointer;
126 var slot = hp +1 ;
127 while (slot < m_handlersCommited) {
128 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
129 SignalError(m_handlers[slot],m_error);
130 }
131 hp = m_handlerPointer;
132 slot = hp +1 ;
133 }
134
135 if (m_extraHandlers != null) {
136 THandler handler;
137 while (m_extraHandlers.TryDequeue(out handler))
138 SignalError(handler, m_error);
139 }
140 }
141
142 void OnCancelled() {
143 var hp = m_handlerPointer;
144 var slot = hp +1 ;
145 while (slot < m_handlersCommited) {
146 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
147 SignalCancelled(m_handlers[slot], m_error);
148 }
149 hp = m_handlerPointer;
150 slot = hp +1 ;
151 }
152
153 if (m_extraHandlers != null) {
154 THandler handler;
155 while (m_extraHandlers.TryDequeue(out handler))
156 SignalCancelled(handler, m_error);
157 }
158 } 87 }
159 88
160 #endregion 89 #endregion
161 90
162 protected abstract void Listen(PromiseEventType events, Action handler);
163 91
164 #region synchronization traits 92 public Type PromiseType {
165 protected void WaitResult(int timeout) { 93 get {
166 if (!IsResolved) { 94 return typeof(void);
167 var lk = new object();
168
169 Listen(PromiseEventType.All, () => {
170 lock(lk) {
171 Monitor.Pulse(lk);
172 }
173 });
174
175 lock (lk) {
176 while(!IsResolved) {
177 if(!Monitor.Wait(lk,timeout))
178 throw new TimeoutException();
179 }
180 }
181
182 }
183 switch (m_state) {
184 case SUCCEEDED_STATE:
185 return;
186 case CANCELLED_STATE:
187 throw new OperationCanceledException();
188 case REJECTED_STATE:
189 throw new TargetInvocationException(m_error);
190 default:
191 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
192 }
193 }
194 #endregion
195
196 #region handlers managment
197
198 protected void AddHandler(THandler handler) {
199
200 if (m_state > 1) {
201 // the promise is in the resolved state, just invoke the handler
202 InvokeHandler(handler);
203 } else {
204 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
205
206 if (slot < RESERVED_HANDLERS_COUNT) {
207 m_handlers[slot] = handler;
208
209 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
210 }
211
212 if (m_state > 1) {
213 do {
214 var hp = m_handlerPointer;
215 slot = hp + 1;
216 if (slot < m_handlersCommited) {
217 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
218 continue;
219 InvokeHandler(m_handlers[slot]);
220 }
221 break;
222 } while(true);
223 }
224 } else {
225 if (slot == RESERVED_HANDLERS_COUNT) {
226 m_extraHandlers = new MTQueue<THandler>();
227 } else {
228 while (m_extraHandlers == null)
229 Thread.MemoryBarrier();
230 }
231
232 m_extraHandlers.Enqueue(handler);
233
234 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
235 // if the promise have been resolved while we was adding the handler to the queue
236 // we can't guarantee that someone is still processing it
237 // therefore we need to fetch a handler from the queue and execute it
238 // note that fetched handler may be not the one that we have added
239 // even we can fetch no handlers at all :)
240 InvokeHandler(handler);
241 }
242 } 95 }
243 } 96 }
244 97
245 protected void InvokeHandler(THandler handler) { 98 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) {
246 switch (m_state) { 99 AddHandler(new HandlerDescriptor(success, error, cancel));
247 case SUCCEEDED_STATE: 100 return this;
248 SignalSuccess(handler);
249 break;
250 case CANCELLED_STATE:
251 SignalCancelled(handler, m_error);
252 break;
253 case REJECTED_STATE:
254 SignalError(handler, m_error);
255 break;
256 default:
257 throw new Exception(String.Format("Invalid promise state {0}", m_state));
258 }
259 } 101 }
260 102
261 #endregion 103 public IPromise On(Action success, Action<Exception> error) {
104 AddHandler(new HandlerDescriptor(success, error, null));
105 return this;
106 }
262 107
263 #region IPromise implementation 108 public IPromise On(Action success) {
109 AddHandler(new HandlerDescriptor(success, null, null));
110 return this;
111 }
264 112
265 public void Join(int timeout) { 113 public IPromise On(Action handler, PromiseEventType events) {
266 WaitResult(timeout); 114 AddHandler(new HandlerDescriptor(handler,events));
115 return this;
116 }
117
118 public IPromise<T> Cast<T>() {
119 throw new InvalidCastException();
267 } 120 }
268 121
269 public void Join() { 122 public void Join() {
270 WaitResult(-1); 123 WaitResult(-1);
271 } 124 }
272 125
273 public bool IsResolved { 126 public void Join(int timeout) {
274 get { 127 WaitResult(timeout);
275 Thread.MemoryBarrier();
276 return m_state > 1;
277 }
278 } 128 }
279 129
280 public bool IsCancelled { 130 protected void SetResult() {
281 get { 131 BeginSetResult();
282 Thread.MemoryBarrier(); 132 EndSetResult();
283 return m_state == CANCELLED_STATE;
284 }
285 }
286
287 #endregion
288
289 public Exception Error {
290 get {
291 return m_error;
292 }
293 } 133 }
294 } 134 }
295 } 135 }
296 136