Mercurial > pub > ImplabNet
annotate Implab/AbstractEvent.cs @ 160:5802131432e4 v2
fixed regression: race condition in Promise
DFA refactoring
author | cin |
---|---|
date | Thu, 18 Feb 2016 19:38:54 +0300 |
parents | 97fbbf816844 |
children | 822aab37b107 |
rev | line source |
---|---|
144 | 1 using System; |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
145 | 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { |
144 | 8 |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
11 protected const int SUCCEEDED_STATE = 2; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
12 protected const int REJECTED_STATE = 3; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
13 protected const int CANCELLED_STATE = 4; |
144 | 14 |
15 const int CANCEL_NOT_REQUESTED = 0; | |
16 const int CANCEL_REQUESTING = 1; | |
17 const int CANCEL_REQUESTED = 2; | |
18 | |
19 const int RESERVED_HANDLERS_COUNT = 4; | |
20 | |
21 int m_state; | |
22 Exception m_error; | |
23 int m_handlersCount; | |
24 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
26 THandler[] m_handlers; |
144 | 27 MTQueue<THandler> m_extraHandlers; |
28 int m_handlerPointer = -1; | |
29 int m_handlersCommited; | |
30 | |
31 int m_cancelRequest; | |
32 Exception m_cancelationReason; | |
33 MTQueue<Action<Exception>> m_cancelationHandlers; | |
34 | |
35 | |
36 #region state managment | |
37 bool BeginTransit() { | |
38 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
39 } | |
40 | |
41 void CompleteTransit(int state) { | |
42 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
43 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
44 } | |
45 | |
46 void WaitTransition() { | |
47 while (m_state == TRANSITIONAL_STATE) { | |
48 Thread.MemoryBarrier(); | |
49 } | |
50 } | |
51 | |
52 protected bool BeginSetResult() { | |
53 if (!BeginTransit()) { | |
54 WaitTransition(); | |
55 if (m_state != CANCELLED_STATE) | |
56 throw new InvalidOperationException("The promise is already resolved"); | |
57 return false; | |
58 } | |
59 return true; | |
60 } | |
61 | |
62 protected void EndSetResult() { | |
63 CompleteTransit(SUCCEEDED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
64 Signal(); |
144 | 65 } |
66 | |
67 | |
68 | |
69 /// <summary> | |
70 /// Выполняет обещание, сообщая об ошибке | |
71 /// </summary> | |
72 /// <remarks> | |
73 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
74 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
75 /// будут проигнорированы. | |
76 /// </remarks> | |
77 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
78 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
79 protected void SetError(Exception error) { | |
80 if (BeginTransit()) { | |
81 if (error is OperationCanceledException) { | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
82 m_error = error.InnerException; |
144 | 83 CompleteTransit(CANCELLED_STATE); |
84 } else { | |
85 m_error = error is PromiseTransientException ? error.InnerException : error; | |
86 CompleteTransit(REJECTED_STATE); | |
87 } | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
88 Signal(); |
144 | 89 } else { |
90 WaitTransition(); | |
91 if (m_state == SUCCEEDED_STATE) | |
92 throw new InvalidOperationException("The promise is already resolved"); | |
93 } | |
94 } | |
95 | |
96 /// <summary> | |
97 /// Отменяет операцию, если это возможно. | |
98 /// </summary> | |
99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
100 protected void SetCancelled(Exception reason) { | |
101 if (BeginTransit()) { | |
102 m_error = reason; | |
103 CompleteTransit(CANCELLED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
104 Signal(); |
144 | 105 } |
106 } | |
107 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
108 protected abstract void SignalHandler(THandler handler, int signal); |
144 | 109 |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
110 void Signal() { |
144 | 111 var hp = m_handlerPointer; |
112 var slot = hp +1 ; | |
113 while (slot < m_handlersCommited) { | |
114 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
115 SignalHandler(m_handlers[slot], m_state); |
144 | 116 } |
117 hp = m_handlerPointer; | |
118 slot = hp +1 ; | |
119 } | |
120 | |
121 | |
122 if (m_extraHandlers != null) { | |
123 THandler handler; | |
124 while (m_extraHandlers.TryDequeue(out handler)) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
125 SignalHandler(handler, m_state); |
144 | 126 } |
127 } | |
128 | |
129 #endregion | |
130 | |
131 protected abstract Signal GetResolveSignal(); | |
132 | |
133 #region synchronization traits | |
134 protected void WaitResult(int timeout) { | |
148 | 135 if (!(IsResolved || GetResolveSignal().Wait(timeout))) |
136 throw new TimeoutException(); | |
144 | 137 |
138 switch (m_state) { | |
139 case SUCCEEDED_STATE: | |
140 return; | |
141 case CANCELLED_STATE: | |
142 throw new OperationCanceledException(); | |
143 case REJECTED_STATE: | |
144 throw new TargetInvocationException(m_error); | |
145 default: | |
146 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | |
147 } | |
148 } | |
149 #endregion | |
150 | |
151 #region handlers managment | |
152 | |
153 protected void AddHandler(THandler handler) { | |
154 | |
155 if (m_state > 1) { | |
156 // the promise is in the resolved state, just invoke the handler | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
157 SignalHandler(handler, m_state); |
144 | 158 } else { |
159 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
160 | |
161 if (slot < RESERVED_HANDLERS_COUNT) { | |
162 | |
160 | 163 if (slot == 0) { |
164 m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | |
165 } else { | |
166 while (m_handlers == null) | |
167 Thread.MemoryBarrier(); | |
168 } | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
169 |
144 | 170 m_handlers[slot] = handler; |
171 | |
172 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
173 } | |
174 | |
175 if (m_state > 1) { | |
176 do { | |
177 var hp = m_handlerPointer; | |
178 slot = hp + 1; | |
179 if (slot < m_handlersCommited) { | |
180 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
181 continue; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
182 SignalHandler(m_handlers[slot], m_state); |
144 | 183 } |
184 break; | |
185 } while(true); | |
186 } | |
187 } else { | |
188 if (slot == RESERVED_HANDLERS_COUNT) { | |
189 m_extraHandlers = new MTQueue<THandler>(); | |
190 } else { | |
191 while (m_extraHandlers == null) | |
192 Thread.MemoryBarrier(); | |
193 } | |
194 | |
195 m_extraHandlers.Enqueue(handler); | |
196 | |
197 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
198 // if the promise have been resolved while we was adding the handler to the queue | |
199 // we can't guarantee that someone is still processing it | |
200 // therefore we need to fetch a handler from the queue and execute it | |
201 // note that fetched handler may be not the one that we have added | |
202 // even we can fetch no handlers at all :) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
203 SignalHandler(handler, m_state); |
144 | 204 } |
205 } | |
206 } | |
207 | |
208 #endregion | |
209 | |
210 #region IPromise implementation | |
211 | |
212 public bool IsResolved { | |
213 get { | |
214 Thread.MemoryBarrier(); | |
215 return m_state > 1; | |
216 } | |
217 } | |
218 | |
219 public bool IsCancelled { | |
220 get { | |
221 Thread.MemoryBarrier(); | |
222 return m_state == CANCELLED_STATE; | |
223 } | |
224 } | |
225 | |
226 #endregion | |
227 | |
228 public Exception Error { | |
229 get { | |
230 return m_error; | |
231 } | |
232 } | |
233 | |
145 | 234 public bool CancelOperationIfRequested() { |
235 if (IsCancellationRequested) { | |
236 CancelOperation(CancellationReason); | |
237 return true; | |
238 } | |
239 return false; | |
144 | 240 } |
241 | |
242 public virtual void CancelOperation(Exception reason) { | |
243 SetCancelled(reason); | |
244 } | |
245 | |
145 | 246 public void CancellationRequested(Action<Exception> handler) { |
144 | 247 Safe.ArgumentNotNull(handler, "handler"); |
145 | 248 if (IsCancellationRequested) |
249 handler(CancellationReason); | |
144 | 250 |
251 if (m_cancelationHandlers == null) | |
252 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null); | |
253 | |
254 m_cancelationHandlers.Enqueue(handler); | |
255 | |
145 | 256 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) |
144 | 257 // TryDeque implies MemoryBarrier() |
258 handler(m_cancelationReason); | |
259 } | |
260 | |
145 | 261 public bool IsCancellationRequested { |
144 | 262 get { |
263 do { | |
264 if (m_cancelRequest == CANCEL_NOT_REQUESTED) | |
265 return false; | |
266 if (m_cancelRequest == CANCEL_REQUESTED) | |
267 return true; | |
268 Thread.MemoryBarrier(); | |
269 } while(true); | |
270 } | |
271 } | |
272 | |
145 | 273 public Exception CancellationReason { |
144 | 274 get { |
275 do { | |
276 Thread.MemoryBarrier(); | |
277 } while(m_cancelRequest == CANCEL_REQUESTING); | |
278 | |
279 return m_cancelationReason; | |
280 } | |
281 } | |
282 | |
283 #region ICancellable implementation | |
284 | |
285 public void Cancel() { | |
286 Cancel(null); | |
287 } | |
288 | |
289 public void Cancel(Exception reason) { | |
145 | 290 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { |
144 | 291 m_cancelationReason = reason; |
292 m_cancelRequest = CANCEL_REQUESTED; | |
293 if (m_cancelationHandlers != null) { | |
294 Action<Exception> handler; | |
295 while (m_cancelationHandlers.TryDequeue(out handler)) | |
296 handler(m_cancelationReason); | |
297 } | |
298 } | |
299 } | |
300 | |
301 #endregion | |
302 } | |
303 } | |
304 |