comparison Implab/AbstractEvent.cs @ 144:8c0b95069066 v2

DRAFT: refactoring
author cin
date Fri, 06 Mar 2015 15:45:26 +0300
parents
children 706fccb85524
comparison
equal deleted inserted replaced
143:16f926ee499d 144:8c0b95069066
1 using System;
2 using Implab.Parallels;
3 using System.Threading;
4 using System.Reflection;
5
6 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancelationToken, ICancellable {
8
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
14
15 const int CANCEL_NOT_REQUESTED = 0;
16 const int CANCEL_REQUESTING = 1;
17 const int CANCEL_REQUESTED = 2;
18
19 const int RESERVED_HANDLERS_COUNT = 4;
20
21 int m_state;
22 Exception m_error;
23 int m_handlersCount;
24
25 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 MTQueue<THandler> m_extraHandlers;
27 int m_handlerPointer = -1;
28 int m_handlersCommited;
29
30 int m_cancelRequest;
31 Exception m_cancelationReason;
32 MTQueue<Action<Exception>> m_cancelationHandlers;
33
34
35 #region state managment
36 bool BeginTransit() {
37 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
38 }
39
40 void CompleteTransit(int state) {
41 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
42 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
43 }
44
45 void WaitTransition() {
46 while (m_state == TRANSITIONAL_STATE) {
47 Thread.MemoryBarrier();
48 }
49 }
50
51 protected bool BeginSetResult() {
52 if (!BeginTransit()) {
53 WaitTransition();
54 if (m_state != CANCELLED_STATE)
55 throw new InvalidOperationException("The promise is already resolved");
56 return false;
57 }
58 return true;
59 }
60
61 protected void EndSetResult() {
62 CompleteTransit(SUCCEEDED_STATE);
63 OnSuccess();
64 }
65
66
67
68 /// <summary>
69 /// Выполняет обещание, сообщая об ошибке
70 /// </summary>
71 /// <remarks>
72 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
73 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
74 /// будут проигнорированы.
75 /// </remarks>
76 /// <param name="error">Исключение возникшее при выполнении операции</param>
77 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
78 protected void SetError(Exception error) {
79 if (BeginTransit()) {
80 if (error is OperationCanceledException) {
81 CompleteTransit(CANCELLED_STATE);
82 m_error = error.InnerException;
83 OnCancelled();
84 } else {
85 m_error = error is PromiseTransientException ? error.InnerException : error;
86 CompleteTransit(REJECTED_STATE);
87 OnError();
88 }
89 } else {
90 WaitTransition();
91 if (m_state == SUCCEEDED_STATE)
92 throw new InvalidOperationException("The promise is already resolved");
93 }
94 }
95
96 /// <summary>
97 /// Отменяет операцию, если это возможно.
98 /// </summary>
99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
100 protected void SetCancelled(Exception reason) {
101 if (BeginTransit()) {
102 m_error = reason;
103 CompleteTransit(CANCELLED_STATE);
104 OnCancelled();
105 }
106 }
107
108 protected abstract void SignalSuccess(THandler handler);
109
110 protected abstract void SignalError(THandler handler, Exception error);
111
112 protected abstract void SignalCancelled(THandler handler, Exception reason);
113
114 void OnSuccess() {
115 var hp = m_handlerPointer;
116 var slot = hp +1 ;
117 while (slot < m_handlersCommited) {
118 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
119 SignalSuccess(m_handlers[slot]);
120 }
121 hp = m_handlerPointer;
122 slot = hp +1 ;
123 }
124
125
126 if (m_extraHandlers != null) {
127 THandler handler;
128 while (m_extraHandlers.TryDequeue(out handler))
129 SignalSuccess(handler);
130 }
131 }
132
133 void OnError() {
134 var hp = m_handlerPointer;
135 var slot = hp +1 ;
136 while (slot < m_handlersCommited) {
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 SignalError(m_handlers[slot],m_error);
139 }
140 hp = m_handlerPointer;
141 slot = hp +1 ;
142 }
143
144 if (m_extraHandlers != null) {
145 THandler handler;
146 while (m_extraHandlers.TryDequeue(out handler))
147 SignalError(handler, m_error);
148 }
149 }
150
151 void OnCancelled() {
152 var hp = m_handlerPointer;
153 var slot = hp +1 ;
154 while (slot < m_handlersCommited) {
155 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
156 SignalCancelled(m_handlers[slot], m_error);
157 }
158 hp = m_handlerPointer;
159 slot = hp +1 ;
160 }
161
162 if (m_extraHandlers != null) {
163 THandler handler;
164 while (m_extraHandlers.TryDequeue(out handler))
165 SignalCancelled(handler, m_error);
166 }
167 }
168
169 #endregion
170
171 protected abstract Signal GetResolveSignal();
172
173 #region synchronization traits
174 protected void WaitResult(int timeout) {
175 if (!IsResolved)
176 GetResolveSignal().Wait(timeout);
177
178 switch (m_state) {
179 case SUCCEEDED_STATE:
180 return;
181 case CANCELLED_STATE:
182 throw new OperationCanceledException();
183 case REJECTED_STATE:
184 throw new TargetInvocationException(m_error);
185 default:
186 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
187 }
188 }
189 #endregion
190
191 #region handlers managment
192
193 protected void AddHandler(THandler handler) {
194
195 if (m_state > 1) {
196 // the promise is in the resolved state, just invoke the handler
197 InvokeHandler(handler);
198 } else {
199 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
200
201 if (slot < RESERVED_HANDLERS_COUNT) {
202
203 m_handlers[slot] = handler;
204
205 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
206 }
207
208 if (m_state > 1) {
209 do {
210 var hp = m_handlerPointer;
211 slot = hp + 1;
212 if (slot < m_handlersCommited) {
213 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
214 continue;
215 InvokeHandler(m_handlers[slot]);
216 }
217 break;
218 } while(true);
219 }
220 } else {
221 if (slot == RESERVED_HANDLERS_COUNT) {
222 m_extraHandlers = new MTQueue<THandler>();
223 } else {
224 while (m_extraHandlers == null)
225 Thread.MemoryBarrier();
226 }
227
228 m_extraHandlers.Enqueue(handler);
229
230 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
231 // if the promise have been resolved while we was adding the handler to the queue
232 // we can't guarantee that someone is still processing it
233 // therefore we need to fetch a handler from the queue and execute it
234 // note that fetched handler may be not the one that we have added
235 // even we can fetch no handlers at all :)
236 InvokeHandler(handler);
237 }
238 }
239 }
240
241 protected void InvokeHandler(THandler handler) {
242 switch (m_state) {
243 case SUCCEEDED_STATE:
244 SignalSuccess(handler);
245 break;
246 case CANCELLED_STATE:
247 SignalCancelled(handler, m_error);
248 break;
249 case REJECTED_STATE:
250 SignalError(handler, m_error);
251 break;
252 default:
253 throw new Exception(String.Format("Invalid promise state {0}", m_state));
254 }
255 }
256
257 #endregion
258
259 #region IPromise implementation
260
261 public bool IsResolved {
262 get {
263 Thread.MemoryBarrier();
264 return m_state > 1;
265 }
266 }
267
268 public bool IsCancelled {
269 get {
270 Thread.MemoryBarrier();
271 return m_state == CANCELLED_STATE;
272 }
273 }
274
275 #endregion
276
277 public Exception Error {
278 get {
279 return m_error;
280 }
281 }
282
283 public bool AcceptIfRequested() {
284 if (IsCancelRequested)
285 CancelOperation(CancelReason);
286 }
287
288 public virtual void CancelOperation(Exception reason) {
289 SetCancelled(reason);
290 }
291
292 public void CancelationRequested(Action<Exception> handler) {
293 Safe.ArgumentNotNull(handler, "handler");
294 if (IsCancelRequested)
295 handler(CancelReason);
296
297 if (m_cancelationHandlers == null)
298 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null);
299
300 m_cancelationHandlers.Enqueue(handler);
301
302 if (IsCancelRequested && m_cancelationHandlers.TryDequeue(out handler))
303 // TryDeque implies MemoryBarrier()
304 handler(m_cancelationReason);
305 }
306
307 public bool IsCancelRequested {
308 get {
309 do {
310 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
311 return false;
312 if (m_cancelRequest == CANCEL_REQUESTED)
313 return true;
314 Thread.MemoryBarrier();
315 } while(true);
316 }
317 }
318
319 public Exception CancelReason {
320 get {
321 do {
322 Thread.MemoryBarrier();
323 } while(m_cancelRequest == CANCEL_REQUESTING);
324
325 return m_cancelationReason;
326 }
327 }
328
329 #region ICancellable implementation
330
331 public void Cancel() {
332 Cancel(null);
333 }
334
335 public void Cancel(Exception reason) {
336 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING)) {
337 m_cancelationReason = reason;
338 m_cancelRequest = CANCEL_REQUESTED;
339 if (m_cancelationHandlers != null) {
340 Action<Exception> handler;
341 while (m_cancelationHandlers.TryDequeue(out handler))
342 handler(m_cancelationReason);
343 }
344 }
345 }
346
347 #endregion
348 }
349 }
350