Mercurial > pub > ImplabNet
annotate Implab/AbstractEvent.cs @ 240:fa6cbf4d8841 v3
refactoring, moving to dotnercore, simplifying promises
author | cin |
---|---|
date | Tue, 23 Jan 2018 19:39:21 +0300 |
parents | d6fe09f5592c |
children | cbe10ac0731e |
rev | line source |
---|---|
144 | 1 using System; |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
240
fa6cbf4d8841
refactoring, moving to dotnercore, simplifying promises
cin
parents:
233
diff
changeset
|
7 public abstract class AbstractEvent<THandler> : ICancellable { |
144 | 8 |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
11 protected const int SUCCEEDED_STATE = 2; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
12 protected const int REJECTED_STATE = 3; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
13 protected const int CANCELLED_STATE = 4; |
144 | 14 |
15 const int CANCEL_NOT_REQUESTED = 0; | |
16 const int CANCEL_REQUESTING = 1; | |
17 const int CANCEL_REQUESTED = 2; | |
18 | |
19 const int RESERVED_HANDLERS_COUNT = 4; | |
20 | |
21 int m_state; | |
22 Exception m_error; | |
23 int m_handlersCount; | |
24 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
26 THandler[] m_handlers; |
233 | 27 SimpleAsyncQueue<THandler> m_extraHandlers; |
144 | 28 int m_handlerPointer = -1; |
29 int m_handlersCommited; | |
30 | |
31 int m_cancelRequest; | |
32 Exception m_cancelationReason; | |
33 | |
34 #region state managment | |
35 bool BeginTransit() { | |
36 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
37 } | |
38 | |
39 void CompleteTransit(int state) { | |
40 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
41 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
42 } | |
43 | |
44 void WaitTransition() { | |
45 while (m_state == TRANSITIONAL_STATE) { | |
46 Thread.MemoryBarrier(); | |
47 } | |
48 } | |
49 | |
50 protected bool BeginSetResult() { | |
51 if (!BeginTransit()) { | |
52 WaitTransition(); | |
53 if (m_state != CANCELLED_STATE) | |
54 throw new InvalidOperationException("The promise is already resolved"); | |
55 return false; | |
56 } | |
57 return true; | |
58 } | |
59 | |
60 protected void EndSetResult() { | |
61 CompleteTransit(SUCCEEDED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
62 Signal(); |
144 | 63 } |
64 | |
65 | |
66 | |
67 /// <summary> | |
68 /// Выполняет обещание, сообщая об ошибке | |
69 /// </summary> | |
70 /// <remarks> | |
71 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
72 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
73 /// будут проигнорированы. | |
74 /// </remarks> | |
75 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
76 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
77 protected void SetError(Exception error) { | |
78 if (BeginTransit()) { | |
186 | 79 m_error = error; |
80 CompleteTransit(REJECTED_STATE); | |
185 | 81 |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
82 Signal(); |
144 | 83 } else { |
84 WaitTransition(); | |
186 | 85 if (m_state == SUCCEEDED_STATE) |
144 | 86 throw new InvalidOperationException("The promise is already resolved"); |
87 } | |
88 } | |
89 | |
90 /// <summary> | |
91 /// Отменяет операцию, если это возможно. | |
92 /// </summary> | |
93 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
94 protected void SetCancelled(Exception reason) { | |
95 if (BeginTransit()) { | |
96 m_error = reason; | |
97 CompleteTransit(CANCELLED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
98 Signal(); |
144 | 99 } |
100 } | |
101 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
102 protected abstract void SignalHandler(THandler handler, int signal); |
144 | 103 |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
104 void Signal() { |
144 | 105 var hp = m_handlerPointer; |
106 var slot = hp +1 ; | |
107 while (slot < m_handlersCommited) { | |
108 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
109 SignalHandler(m_handlers[slot], m_state); |
144 | 110 } |
111 hp = m_handlerPointer; | |
112 slot = hp +1 ; | |
113 } | |
114 | |
115 | |
116 if (m_extraHandlers != null) { | |
117 THandler handler; | |
118 while (m_extraHandlers.TryDequeue(out handler)) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
119 SignalHandler(handler, m_state); |
144 | 120 } |
121 } | |
122 | |
123 #endregion | |
124 | |
125 protected abstract Signal GetResolveSignal(); | |
126 | |
127 #region synchronization traits | |
128 protected void WaitResult(int timeout) { | |
148 | 129 if (!(IsResolved || GetResolveSignal().Wait(timeout))) |
130 throw new TimeoutException(); | |
144 | 131 |
132 switch (m_state) { | |
133 case SUCCEEDED_STATE: | |
134 return; | |
135 case CANCELLED_STATE: | |
186 | 136 throw new OperationCanceledException("The operation has been cancelled", m_error); |
144 | 137 case REJECTED_STATE: |
138 throw new TargetInvocationException(m_error); | |
139 default: | |
186 | 140 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); |
144 | 141 } |
142 } | |
143 #endregion | |
144 | |
145 #region handlers managment | |
146 | |
147 protected void AddHandler(THandler handler) { | |
148 | |
149 if (m_state > 1) { | |
150 // the promise is in the resolved state, just invoke the handler | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
151 SignalHandler(handler, m_state); |
144 | 152 } else { |
153 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
154 | |
155 if (slot < RESERVED_HANDLERS_COUNT) { | |
156 | |
160 | 157 if (slot == 0) { |
158 m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | |
159 } else { | |
160 while (m_handlers == null) | |
161 Thread.MemoryBarrier(); | |
162 } | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
163 |
144 | 164 m_handlers[slot] = handler; |
165 | |
166 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
167 } | |
168 | |
169 if (m_state > 1) { | |
170 do { | |
171 var hp = m_handlerPointer; | |
172 slot = hp + 1; | |
173 if (slot < m_handlersCommited) { | |
174 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
175 continue; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
176 SignalHandler(m_handlers[slot], m_state); |
144 | 177 } |
178 break; | |
179 } while(true); | |
180 } | |
181 } else { | |
182 if (slot == RESERVED_HANDLERS_COUNT) { | |
233 | 183 m_extraHandlers = new SimpleAsyncQueue<THandler>(); |
144 | 184 } else { |
185 while (m_extraHandlers == null) | |
186 Thread.MemoryBarrier(); | |
187 } | |
188 | |
189 m_extraHandlers.Enqueue(handler); | |
190 | |
191 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
192 // if the promise have been resolved while we was adding the handler to the queue | |
193 // we can't guarantee that someone is still processing it | |
194 // therefore we need to fetch a handler from the queue and execute it | |
195 // note that fetched handler may be not the one that we have added | |
196 // even we can fetch no handlers at all :) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
197 SignalHandler(handler, m_state); |
144 | 198 } |
199 } | |
200 } | |
201 | |
202 #endregion | |
203 | |
204 #region IPromise implementation | |
205 | |
206 public bool IsResolved { | |
207 get { | |
208 Thread.MemoryBarrier(); | |
209 return m_state > 1; | |
210 } | |
211 } | |
212 | |
213 public bool IsCancelled { | |
214 get { | |
215 Thread.MemoryBarrier(); | |
216 return m_state == CANCELLED_STATE; | |
217 } | |
218 } | |
219 | |
220 #endregion | |
221 | |
222 public Exception Error { | |
223 get { | |
224 return m_error; | |
225 } | |
226 } | |
227 | |
145 | 228 public bool CancelOperationIfRequested() { |
229 if (IsCancellationRequested) { | |
230 CancelOperation(CancellationReason); | |
231 return true; | |
232 } | |
233 return false; | |
144 | 234 } |
235 | |
236 public virtual void CancelOperation(Exception reason) { | |
237 SetCancelled(reason); | |
238 } | |
239 | |
145 | 240 public void CancellationRequested(Action<Exception> handler) { |
144 | 241 Safe.ArgumentNotNull(handler, "handler"); |
145 | 242 if (IsCancellationRequested) |
243 handler(CancellationReason); | |
144 | 244 |
245 if (m_cancelationHandlers == null) | |
233 | 246 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null); |
144 | 247 |
248 m_cancelationHandlers.Enqueue(handler); | |
249 | |
145 | 250 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) |
144 | 251 // TryDeque implies MemoryBarrier() |
252 handler(m_cancelationReason); | |
253 } | |
254 | |
145 | 255 public bool IsCancellationRequested { |
144 | 256 get { |
257 do { | |
258 if (m_cancelRequest == CANCEL_NOT_REQUESTED) | |
259 return false; | |
260 if (m_cancelRequest == CANCEL_REQUESTED) | |
261 return true; | |
262 Thread.MemoryBarrier(); | |
263 } while(true); | |
264 } | |
265 } | |
266 | |
145 | 267 public Exception CancellationReason { |
144 | 268 get { |
269 do { | |
270 Thread.MemoryBarrier(); | |
271 } while(m_cancelRequest == CANCEL_REQUESTING); | |
272 | |
273 return m_cancelationReason; | |
274 } | |
275 } | |
276 | |
277 #region ICancellable implementation | |
278 | |
279 public void Cancel() { | |
280 Cancel(null); | |
281 } | |
282 | |
283 public void Cancel(Exception reason) { | |
145 | 284 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { |
144 | 285 m_cancelationReason = reason; |
286 m_cancelRequest = CANCEL_REQUESTED; | |
287 if (m_cancelationHandlers != null) { | |
288 Action<Exception> handler; | |
289 while (m_cancelationHandlers.TryDequeue(out handler)) | |
290 handler(m_cancelationReason); | |
291 } | |
292 } | |
293 } | |
294 | |
295 #endregion | |
296 } | |
297 } | |
298 |