Mercurial > pub > ImplabNet
annotate Implab/AbstractEvent.cs @ 185:822aab37b107 ref20160224
runnable component, work in progress
author | cin |
---|---|
date | Mon, 18 Apr 2016 16:41:17 +0300 |
parents | 5802131432e4 |
children | 75103928da09 |
rev | line source |
---|---|
144 | 1 using System; |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
145 | 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { |
144 | 8 |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
11 protected const int SUCCEEDED_STATE = 2; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
12 protected const int REJECTED_STATE = 3; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
13 protected const int CANCELLED_STATE = 4; |
144 | 14 |
15 const int CANCEL_NOT_REQUESTED = 0; | |
16 const int CANCEL_REQUESTING = 1; | |
17 const int CANCEL_REQUESTED = 2; | |
18 | |
19 const int RESERVED_HANDLERS_COUNT = 4; | |
20 | |
21 int m_state; | |
22 Exception m_error; | |
23 int m_handlersCount; | |
24 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
26 THandler[] m_handlers; |
144 | 27 MTQueue<THandler> m_extraHandlers; |
28 int m_handlerPointer = -1; | |
29 int m_handlersCommited; | |
30 | |
31 int m_cancelRequest; | |
32 Exception m_cancelationReason; | |
33 MTQueue<Action<Exception>> m_cancelationHandlers; | |
34 | |
35 | |
36 #region state managment | |
37 bool BeginTransit() { | |
38 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
39 } | |
40 | |
41 void CompleteTransit(int state) { | |
42 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
43 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
44 } | |
45 | |
46 void WaitTransition() { | |
47 while (m_state == TRANSITIONAL_STATE) { | |
48 Thread.MemoryBarrier(); | |
49 } | |
50 } | |
51 | |
52 protected bool BeginSetResult() { | |
53 if (!BeginTransit()) { | |
54 WaitTransition(); | |
55 if (m_state != CANCELLED_STATE) | |
56 throw new InvalidOperationException("The promise is already resolved"); | |
57 return false; | |
58 } | |
59 return true; | |
60 } | |
61 | |
62 protected void EndSetResult() { | |
63 CompleteTransit(SUCCEEDED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
64 Signal(); |
144 | 65 } |
66 | |
67 | |
68 | |
69 /// <summary> | |
70 /// Выполняет обещание, сообщая об ошибке | |
71 /// </summary> | |
72 /// <remarks> | |
73 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
74 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
75 /// будут проигнорированы. | |
76 /// </remarks> | |
77 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
78 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
79 protected void SetError(Exception error) { | |
185 | 80 while (error is PromiseTransientException) |
81 error = error.InnerException; | |
82 | |
83 var isCancel = error is OperationCanceledException; | |
84 | |
144 | 85 if (BeginTransit()) { |
185 | 86 m_error = isCancel ? error.InnerException : error; |
87 CompleteTransit(isCancel ? CANCELLED_STATE : REJECTED_STATE); | |
88 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
89 Signal(); |
144 | 90 } else { |
91 WaitTransition(); | |
185 | 92 if (!isCancel || m_state == SUCCEEDED_STATE) |
144 | 93 throw new InvalidOperationException("The promise is already resolved"); |
94 } | |
95 } | |
96 | |
97 /// <summary> | |
98 /// Отменяет операцию, если это возможно. | |
99 /// </summary> | |
100 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
101 protected void SetCancelled(Exception reason) { | |
102 if (BeginTransit()) { | |
103 m_error = reason; | |
104 CompleteTransit(CANCELLED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
105 Signal(); |
144 | 106 } |
107 } | |
108 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
109 protected abstract void SignalHandler(THandler handler, int signal); |
144 | 110 |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
111 void Signal() { |
144 | 112 var hp = m_handlerPointer; |
113 var slot = hp +1 ; | |
114 while (slot < m_handlersCommited) { | |
115 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
116 SignalHandler(m_handlers[slot], m_state); |
144 | 117 } |
118 hp = m_handlerPointer; | |
119 slot = hp +1 ; | |
120 } | |
121 | |
122 | |
123 if (m_extraHandlers != null) { | |
124 THandler handler; | |
125 while (m_extraHandlers.TryDequeue(out handler)) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
126 SignalHandler(handler, m_state); |
144 | 127 } |
128 } | |
129 | |
130 #endregion | |
131 | |
132 protected abstract Signal GetResolveSignal(); | |
133 | |
134 #region synchronization traits | |
135 protected void WaitResult(int timeout) { | |
148 | 136 if (!(IsResolved || GetResolveSignal().Wait(timeout))) |
137 throw new TimeoutException(); | |
144 | 138 |
139 switch (m_state) { | |
140 case SUCCEEDED_STATE: | |
141 return; | |
142 case CANCELLED_STATE: | |
143 throw new OperationCanceledException(); | |
144 case REJECTED_STATE: | |
145 throw new TargetInvocationException(m_error); | |
146 default: | |
147 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | |
148 } | |
149 } | |
150 #endregion | |
151 | |
152 #region handlers managment | |
153 | |
154 protected void AddHandler(THandler handler) { | |
155 | |
156 if (m_state > 1) { | |
157 // the promise is in the resolved state, just invoke the handler | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
158 SignalHandler(handler, m_state); |
144 | 159 } else { |
160 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
161 | |
162 if (slot < RESERVED_HANDLERS_COUNT) { | |
163 | |
160 | 164 if (slot == 0) { |
165 m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | |
166 } else { | |
167 while (m_handlers == null) | |
168 Thread.MemoryBarrier(); | |
169 } | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
170 |
144 | 171 m_handlers[slot] = handler; |
172 | |
173 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
174 } | |
175 | |
176 if (m_state > 1) { | |
177 do { | |
178 var hp = m_handlerPointer; | |
179 slot = hp + 1; | |
180 if (slot < m_handlersCommited) { | |
181 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
182 continue; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
183 SignalHandler(m_handlers[slot], m_state); |
144 | 184 } |
185 break; | |
186 } while(true); | |
187 } | |
188 } else { | |
189 if (slot == RESERVED_HANDLERS_COUNT) { | |
190 m_extraHandlers = new MTQueue<THandler>(); | |
191 } else { | |
192 while (m_extraHandlers == null) | |
193 Thread.MemoryBarrier(); | |
194 } | |
195 | |
196 m_extraHandlers.Enqueue(handler); | |
197 | |
198 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
199 // if the promise have been resolved while we was adding the handler to the queue | |
200 // we can't guarantee that someone is still processing it | |
201 // therefore we need to fetch a handler from the queue and execute it | |
202 // note that fetched handler may be not the one that we have added | |
203 // even we can fetch no handlers at all :) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
204 SignalHandler(handler, m_state); |
144 | 205 } |
206 } | |
207 } | |
208 | |
209 #endregion | |
210 | |
211 #region IPromise implementation | |
212 | |
213 public bool IsResolved { | |
214 get { | |
215 Thread.MemoryBarrier(); | |
216 return m_state > 1; | |
217 } | |
218 } | |
219 | |
220 public bool IsCancelled { | |
221 get { | |
222 Thread.MemoryBarrier(); | |
223 return m_state == CANCELLED_STATE; | |
224 } | |
225 } | |
226 | |
227 #endregion | |
228 | |
229 public Exception Error { | |
230 get { | |
231 return m_error; | |
232 } | |
233 } | |
234 | |
145 | 235 public bool CancelOperationIfRequested() { |
236 if (IsCancellationRequested) { | |
237 CancelOperation(CancellationReason); | |
238 return true; | |
239 } | |
240 return false; | |
144 | 241 } |
242 | |
243 public virtual void CancelOperation(Exception reason) { | |
244 SetCancelled(reason); | |
245 } | |
246 | |
145 | 247 public void CancellationRequested(Action<Exception> handler) { |
144 | 248 Safe.ArgumentNotNull(handler, "handler"); |
145 | 249 if (IsCancellationRequested) |
250 handler(CancellationReason); | |
144 | 251 |
252 if (m_cancelationHandlers == null) | |
253 Interlocked.CompareExchange(ref m_cancelationHandlers, new MTQueue<Action<Exception>>(), null); | |
254 | |
255 m_cancelationHandlers.Enqueue(handler); | |
256 | |
145 | 257 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) |
144 | 258 // TryDeque implies MemoryBarrier() |
259 handler(m_cancelationReason); | |
260 } | |
261 | |
145 | 262 public bool IsCancellationRequested { |
144 | 263 get { |
264 do { | |
265 if (m_cancelRequest == CANCEL_NOT_REQUESTED) | |
266 return false; | |
267 if (m_cancelRequest == CANCEL_REQUESTED) | |
268 return true; | |
269 Thread.MemoryBarrier(); | |
270 } while(true); | |
271 } | |
272 } | |
273 | |
145 | 274 public Exception CancellationReason { |
144 | 275 get { |
276 do { | |
277 Thread.MemoryBarrier(); | |
278 } while(m_cancelRequest == CANCEL_REQUESTING); | |
279 | |
280 return m_cancelationReason; | |
281 } | |
282 } | |
283 | |
284 #region ICancellable implementation | |
285 | |
286 public void Cancel() { | |
287 Cancel(null); | |
288 } | |
289 | |
290 public void Cancel(Exception reason) { | |
145 | 291 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { |
144 | 292 m_cancelationReason = reason; |
293 m_cancelRequest = CANCEL_REQUESTED; | |
294 if (m_cancelationHandlers != null) { | |
295 Action<Exception> handler; | |
296 while (m_cancelationHandlers.TryDequeue(out handler)) | |
297 handler(m_cancelationReason); | |
298 } | |
299 } | |
300 } | |
301 | |
302 #endregion | |
303 } | |
304 } | |
305 |