Mercurial > pub > ImplabNet
comparison Implab/AbstractEvent.cs @ 144:8c0b95069066 v2
DRAFT: refactoring
author | cin |
---|---|
date | Fri, 06 Mar 2015 15:45:26 +0300 |
parents | |
children | 706fccb85524 |
comparison
equal
deleted
inserted
replaced
143:16f926ee499d | 144:8c0b95069066 |
---|---|
1 using System; | |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
7 public abstract class AbstractEvent<THandler> : ICancelationToken, ICancellable { | |
8 | |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
11 const int SUCCEEDED_STATE = 2; | |
12 const int REJECTED_STATE = 3; | |
13 const int CANCELLED_STATE = 4; | |
14 | |
15 const int CANCEL_NOT_REQUESTED = 0; | |
16 const int CANCEL_REQUESTING = 1; | |
17 const int CANCEL_REQUESTED = 2; | |
18 | |
19 const int RESERVED_HANDLERS_COUNT = 4; | |
20 | |
21 int m_state; | |
22 Exception m_error; | |
23 int m_handlersCount; | |
24 | |
25 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | |
26 MTQueue<THandler> m_extraHandlers; | |
27 int m_handlerPointer = -1; | |
28 int m_handlersCommited; | |
29 | |
30 int m_cancelRequest; | |
31 Exception m_cancelationReason; | |
32 MTQueue<Action<Exception>> m_cancelationHandlers; | |
33 | |
34 | |
35 #region state managment | |
36 bool BeginTransit() { | |
37 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
38 } | |
39 | |
40 void CompleteTransit(int state) { | |
41 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
42 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
43 } | |
44 | |
45 void WaitTransition() { | |
46 while (m_state == TRANSITIONAL_STATE) { | |
47 Thread.MemoryBarrier(); | |
48 } | |
49 } | |
50 | |
51 protected bool BeginSetResult() { | |
52 if (!BeginTransit()) { | |
53 WaitTransition(); | |
54 if (m_state != CANCELLED_STATE) | |
55 throw new InvalidOperationException("The promise is already resolved"); | |
56 return false; | |
57 } | |
58 return true; | |
59 } | |
60 | |
61 protected void EndSetResult() { | |
62 CompleteTransit(SUCCEEDED_STATE); | |
63 OnSuccess(); | |
64 } | |
65 | |
66 | |
67 | |
68 /// <summary> | |
69 /// Выполняет обещание, сообщая об ошибке | |
70 /// </summary> | |
71 /// <remarks> | |
72 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
73 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
74 /// будут проигнорированы. | |
75 /// </remarks> | |
76 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
77 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
78 protected void SetError(Exception error) { | |
79 if (BeginTransit()) { | |
80 if (error is OperationCanceledException) { | |
81 CompleteTransit(CANCELLED_STATE); | |
82 m_error = error.InnerException; | |
83 OnCancelled(); | |
84 } else { | |
85 m_error = error is PromiseTransientException ? error.InnerException : error; | |
86 CompleteTransit(REJECTED_STATE); | |
87 OnError(); | |
88 } | |
89 } else { | |
90 WaitTransition(); | |
91 if (m_state == SUCCEEDED_STATE) | |
92 throw new InvalidOperationException("The promise is already resolved"); | |
93 } | |
94 } | |
95 | |
96 /// <summary> | |
97 /// Отменяет операцию, если это возможно. | |
98 /// </summary> | |
99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
100 protected void SetCancelled(Exception reason) { | |
101 if (BeginTransit()) { | |
102 m_error = reason; | |
103 CompleteTransit(CANCELLED_STATE); | |
104 OnCancelled(); | |
105 } | |
106 } | |
107 | |
108 protected abstract void SignalSuccess(THandler handler); | |
109 | |
110 protected abstract void SignalError(THandler handler, Exception error); | |
111 | |
112 protected abstract void SignalCancelled(THandler handler, Exception reason); | |
113 | |
114 void OnSuccess() { | |
115 var hp = m_handlerPointer; | |
116 var slot = hp +1 ; | |
117 while (slot < m_handlersCommited) { | |
118 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
119 SignalSuccess(m_handlers[slot]); | |
120 } | |
121 hp = m_handlerPointer; | |
122 slot = hp +1 ; | |
123 } | |
124 | |
125 | |
126 if (m_extraHandlers != null) { | |
127 THandler handler; | |
128 while (m_extraHandlers.TryDequeue(out handler)) | |
129 SignalSuccess(handler); | |
130 } | |
131 } | |
132 | |
133 void OnError() { | |
134 var hp = m_handlerPointer; | |
135 var slot = hp +1 ; | |
136 while (slot < m_handlersCommited) { | |
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
138 SignalError(m_handlers[slot],m_error); | |
139 } | |
140 hp = m_handlerPointer; | |
141 slot = hp +1 ; | |
142 } | |
143 | |
144 if (m_extraHandlers != null) { | |
145 THandler handler; | |
146 while (m_extraHandlers.TryDequeue(out handler)) | |
147 SignalError(handler, m_error); | |
148 } | |
149 } | |
150 | |
151 void OnCancelled() { | |
152 var hp = m_handlerPointer; | |
153 var slot = hp +1 ; | |
154 while (slot < m_handlersCommited) { | |
155 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
156 SignalCancelled(m_handlers[slot], m_error); | |
157 } | |
158 hp = m_handlerPointer; | |
159 slot = hp +1 ; | |
160 } | |
161 | |
162 if (m_extraHandlers != null) { | |
163 THandler handler; | |
164 while (m_extraHandlers.TryDequeue(out handler)) | |
165 SignalCancelled(handler, m_error); | |
166 } | |
167 } | |
168 | |
169 #endregion | |
170 | |
171 protected abstract Signal GetResolveSignal(); | |
172 | |
173 #region synchronization traits | |
174 protected void WaitResult(int timeout) { | |
175 if (!IsResolved) | |
176 GetResolveSignal().Wait(timeout); | |
177 | |
178 switch (m_state) { | |
179 case SUCCEEDED_STATE: | |
180 return; | |
181 case CANCELLED_STATE: | |
182 throw new OperationCanceledException(); | |
183 case REJECTED_STATE: | |
184 throw new TargetInvocationException(m_error); | |
185 default: | |
186 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | |
187 } | |
188 } | |
189 #endregion | |
190 | |
191 #region handlers managment | |
192 | |
193 protected void AddHandler(THandler handler) { | |
194 | |
195 if (m_state > 1) { | |
196 // the promise is in the resolved state, just invoke the handler | |
197 InvokeHandler(handler); | |
198 } else { | |
199 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
200 | |
201 if (slot < RESERVED_HANDLERS_COUNT) { | |
202 | |
203 m_handlers[slot] = handler; | |
204 | |
205 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
206 } | |
207 | |
208 if (m_state > 1) { | |
209 do { | |
210 var hp = m_handlerPointer; | |
211 slot = hp + 1; | |
212 if (slot < m_handlersCommited) { | |
213 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
214 continue; | |
215 InvokeHandler(m_handlers[slot]); | |
216 } | |
217 break; | |
218 } while(true); | |
219 } | |
220 } else { | |
221 if (slot == RESERVED_HANDLERS_COUNT) { | |
222 m_extraHandlers = new MTQueue<THandler>(); | |
223 } else { | |
224 while (m_extraHandlers == null) | |
225 Thread.MemoryBarrier(); | |
226 } | |
227 | |
228 m_extraHandlers.Enqueue(handler); | |
229 | |
230 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
231 // if the promise have been resolved while we was adding the handler to the queue | |
232 // we can't guarantee that someone is still processing it | |
233 // therefore we need to fetch a handler from the queue and execute it | |
234 // note that fetched handler may be not the one that we have added | |
235 // even we can fetch no handlers at all :) | |
236 InvokeHandler(handler); | |
237 } | |
238 } | |
239 } | |
240 | |
241 protected void InvokeHandler(THandler handler) { | |
242 switch (m_state) { | |
243 case SUCCEEDED_STATE: | |
244 SignalSuccess(handler); | |
245 break; | |
246 case CANCELLED_STATE: | |
247 SignalCancelled(handler, m_error); | |
248 break; | |
249 case REJECTED_STATE: | |
250 SignalError(handler, m_error); | |
251 break; | |
252 default: | |
253 throw new Exception(String.Format("Invalid promise state {0}", m_state)); | |
254 } | |
255 } | |
256 | |
257 #endregion | |
258 | |
259 #region IPromise implementation | |
260 | |
261 public bool IsResolved { | |
262 get { | |
263 Thread.MemoryBarrier(); | |
264 return m_state > 1; | |
265 } | |
266 } | |
267 | |
268 public bool IsCancelled { | |
269 get { | |
270 Thread.MemoryBarrier(); | |
271 return m_state == CANCELLED_STATE; | |
272 } | |
273 } | |
274 | |
275 #endregion | |
276 | |
277 public Exception Error { | |
278 get { | |
279 return m_error; | |
280 } | |
281 } | |
282 | |
283 public bool AcceptIfRequested() { | |
284 if (IsCancelRequested) | |
285 CancelOperation(CancelReason); | |
286 } | |
287 | |
288 public virtual void CancelOperation(Exception reason) { | |
289 SetCancelled(reason); | |
290 } | |
291 | |
292 public void CancelationRequested(Action<Exception> handler) { | |
293 Safe.ArgumentNotNull(handler, "handler"); | |
294 if (IsCancelRequested) | |
295 handler(CancelReason); | |
296 | |
297 if (m_cancelationHandlers == null) | |
298 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null); | |
299 | |
300 m_cancelationHandlers.Enqueue(handler); | |
301 | |
302 if (IsCancelRequested && m_cancelationHandlers.TryDequeue(out handler)) | |
303 // TryDeque implies MemoryBarrier() | |
304 handler(m_cancelationReason); | |
305 } | |
306 | |
307 public bool IsCancelRequested { | |
308 get { | |
309 do { | |
310 if (m_cancelRequest == CANCEL_NOT_REQUESTED) | |
311 return false; | |
312 if (m_cancelRequest == CANCEL_REQUESTED) | |
313 return true; | |
314 Thread.MemoryBarrier(); | |
315 } while(true); | |
316 } | |
317 } | |
318 | |
319 public Exception CancelReason { | |
320 get { | |
321 do { | |
322 Thread.MemoryBarrier(); | |
323 } while(m_cancelRequest == CANCEL_REQUESTING); | |
324 | |
325 return m_cancelationReason; | |
326 } | |
327 } | |
328 | |
329 #region ICancellable implementation | |
330 | |
331 public void Cancel() { | |
332 Cancel(null); | |
333 } | |
334 | |
335 public void Cancel(Exception reason) { | |
336 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING)) { | |
337 m_cancelationReason = reason; | |
338 m_cancelRequest = CANCEL_REQUESTED; | |
339 if (m_cancelationHandlers != null) { | |
340 Action<Exception> handler; | |
341 while (m_cancelationHandlers.TryDequeue(out handler)) | |
342 handler(m_cancelationReason); | |
343 } | |
344 } | |
345 } | |
346 | |
347 #endregion | |
348 } | |
349 } | |
350 |