Mercurial > pub > ImplabNet
annotate Implab/AbstractEvent.cs @ 233:d6fe09f5592c v2
Improved AsyncQueue
Removed ImplabFx
author | cin |
---|---|
date | Wed, 04 Oct 2017 15:44:47 +0300 |
parents | 75103928da09 |
children | fa6cbf4d8841 |
rev | line source |
---|---|
144 | 1 using System; |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
145 | 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { |
144 | 8 |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
11 protected const int SUCCEEDED_STATE = 2; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
12 protected const int REJECTED_STATE = 3; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
13 protected const int CANCELLED_STATE = 4; |
144 | 14 |
15 const int CANCEL_NOT_REQUESTED = 0; | |
16 const int CANCEL_REQUESTING = 1; | |
17 const int CANCEL_REQUESTED = 2; | |
18 | |
19 const int RESERVED_HANDLERS_COUNT = 4; | |
20 | |
21 int m_state; | |
22 Exception m_error; | |
23 int m_handlersCount; | |
24 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; |
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
26 THandler[] m_handlers; |
233 | 27 SimpleAsyncQueue<THandler> m_extraHandlers; |
144 | 28 int m_handlerPointer = -1; |
29 int m_handlersCommited; | |
30 | |
31 int m_cancelRequest; | |
32 Exception m_cancelationReason; | |
233 | 33 SimpleAsyncQueue<Action<Exception>> m_cancelationHandlers; |
144 | 34 |
35 | |
36 #region state managment | |
37 bool BeginTransit() { | |
38 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
39 } | |
40 | |
41 void CompleteTransit(int state) { | |
42 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
43 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
44 } | |
45 | |
46 void WaitTransition() { | |
47 while (m_state == TRANSITIONAL_STATE) { | |
48 Thread.MemoryBarrier(); | |
49 } | |
50 } | |
51 | |
52 protected bool BeginSetResult() { | |
53 if (!BeginTransit()) { | |
54 WaitTransition(); | |
55 if (m_state != CANCELLED_STATE) | |
56 throw new InvalidOperationException("The promise is already resolved"); | |
57 return false; | |
58 } | |
59 return true; | |
60 } | |
61 | |
62 protected void EndSetResult() { | |
63 CompleteTransit(SUCCEEDED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
64 Signal(); |
144 | 65 } |
66 | |
67 | |
68 | |
69 /// <summary> | |
70 /// Выполняет обещание, сообщая об ошибке | |
71 /// </summary> | |
72 /// <remarks> | |
73 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
74 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
75 /// будут проигнорированы. | |
76 /// </remarks> | |
77 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
78 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
79 protected void SetError(Exception error) { | |
80 if (BeginTransit()) { | |
186 | 81 m_error = error; |
82 CompleteTransit(REJECTED_STATE); | |
185 | 83 |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
84 Signal(); |
144 | 85 } else { |
86 WaitTransition(); | |
186 | 87 if (m_state == SUCCEEDED_STATE) |
144 | 88 throw new InvalidOperationException("The promise is already resolved"); |
89 } | |
90 } | |
91 | |
92 /// <summary> | |
93 /// Отменяет операцию, если это возможно. | |
94 /// </summary> | |
95 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
96 protected void SetCancelled(Exception reason) { | |
97 if (BeginTransit()) { | |
98 m_error = reason; | |
99 CompleteTransit(CANCELLED_STATE); | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
100 Signal(); |
144 | 101 } |
102 } | |
103 | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
104 protected abstract void SignalHandler(THandler handler, int signal); |
144 | 105 |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
106 void Signal() { |
144 | 107 var hp = m_handlerPointer; |
108 var slot = hp +1 ; | |
109 while (slot < m_handlersCommited) { | |
110 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
111 SignalHandler(m_handlers[slot], m_state); |
144 | 112 } |
113 hp = m_handlerPointer; | |
114 slot = hp +1 ; | |
115 } | |
116 | |
117 | |
118 if (m_extraHandlers != null) { | |
119 THandler handler; | |
120 while (m_extraHandlers.TryDequeue(out handler)) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
121 SignalHandler(handler, m_state); |
144 | 122 } |
123 } | |
124 | |
125 #endregion | |
126 | |
127 protected abstract Signal GetResolveSignal(); | |
128 | |
129 #region synchronization traits | |
130 protected void WaitResult(int timeout) { | |
148 | 131 if (!(IsResolved || GetResolveSignal().Wait(timeout))) |
132 throw new TimeoutException(); | |
144 | 133 |
134 switch (m_state) { | |
135 case SUCCEEDED_STATE: | |
136 return; | |
137 case CANCELLED_STATE: | |
186 | 138 throw new OperationCanceledException("The operation has been cancelled", m_error); |
144 | 139 case REJECTED_STATE: |
140 throw new TargetInvocationException(m_error); | |
141 default: | |
186 | 142 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); |
144 | 143 } |
144 } | |
145 #endregion | |
146 | |
147 #region handlers managment | |
148 | |
149 protected void AddHandler(THandler handler) { | |
150 | |
151 if (m_state > 1) { | |
152 // the promise is in the resolved state, just invoke the handler | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
153 SignalHandler(handler, m_state); |
144 | 154 } else { |
155 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
156 | |
157 if (slot < RESERVED_HANDLERS_COUNT) { | |
158 | |
160 | 159 if (slot == 0) { |
160 m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | |
161 } else { | |
162 while (m_handlers == null) | |
163 Thread.MemoryBarrier(); | |
164 } | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
165 |
144 | 166 m_handlers[slot] = handler; |
167 | |
168 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
169 } | |
170 | |
171 if (m_state > 1) { | |
172 do { | |
173 var hp = m_handlerPointer; | |
174 slot = hp + 1; | |
175 if (slot < m_handlersCommited) { | |
176 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
177 continue; | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
178 SignalHandler(m_handlers[slot], m_state); |
144 | 179 } |
180 break; | |
181 } while(true); | |
182 } | |
183 } else { | |
184 if (slot == RESERVED_HANDLERS_COUNT) { | |
233 | 185 m_extraHandlers = new SimpleAsyncQueue<THandler>(); |
144 | 186 } else { |
187 while (m_extraHandlers == null) | |
188 Thread.MemoryBarrier(); | |
189 } | |
190 | |
191 m_extraHandlers.Enqueue(handler); | |
192 | |
193 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
194 // if the promise have been resolved while we was adding the handler to the queue | |
195 // we can't guarantee that someone is still processing it | |
196 // therefore we need to fetch a handler from the queue and execute it | |
197 // note that fetched handler may be not the one that we have added | |
198 // even we can fetch no handlers at all :) | |
156
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
cin
parents:
148
diff
changeset
|
199 SignalHandler(handler, m_state); |
144 | 200 } |
201 } | |
202 } | |
203 | |
204 #endregion | |
205 | |
206 #region IPromise implementation | |
207 | |
208 public bool IsResolved { | |
209 get { | |
210 Thread.MemoryBarrier(); | |
211 return m_state > 1; | |
212 } | |
213 } | |
214 | |
215 public bool IsCancelled { | |
216 get { | |
217 Thread.MemoryBarrier(); | |
218 return m_state == CANCELLED_STATE; | |
219 } | |
220 } | |
221 | |
222 #endregion | |
223 | |
224 public Exception Error { | |
225 get { | |
226 return m_error; | |
227 } | |
228 } | |
229 | |
145 | 230 public bool CancelOperationIfRequested() { |
231 if (IsCancellationRequested) { | |
232 CancelOperation(CancellationReason); | |
233 return true; | |
234 } | |
235 return false; | |
144 | 236 } |
237 | |
238 public virtual void CancelOperation(Exception reason) { | |
239 SetCancelled(reason); | |
240 } | |
241 | |
145 | 242 public void CancellationRequested(Action<Exception> handler) { |
144 | 243 Safe.ArgumentNotNull(handler, "handler"); |
145 | 244 if (IsCancellationRequested) |
245 handler(CancellationReason); | |
144 | 246 |
247 if (m_cancelationHandlers == null) | |
233 | 248 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null); |
144 | 249 |
250 m_cancelationHandlers.Enqueue(handler); | |
251 | |
145 | 252 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) |
144 | 253 // TryDeque implies MemoryBarrier() |
254 handler(m_cancelationReason); | |
255 } | |
256 | |
145 | 257 public bool IsCancellationRequested { |
144 | 258 get { |
259 do { | |
260 if (m_cancelRequest == CANCEL_NOT_REQUESTED) | |
261 return false; | |
262 if (m_cancelRequest == CANCEL_REQUESTED) | |
263 return true; | |
264 Thread.MemoryBarrier(); | |
265 } while(true); | |
266 } | |
267 } | |
268 | |
145 | 269 public Exception CancellationReason { |
144 | 270 get { |
271 do { | |
272 Thread.MemoryBarrier(); | |
273 } while(m_cancelRequest == CANCEL_REQUESTING); | |
274 | |
275 return m_cancelationReason; | |
276 } | |
277 } | |
278 | |
279 #region ICancellable implementation | |
280 | |
281 public void Cancel() { | |
282 Cancel(null); | |
283 } | |
284 | |
285 public void Cancel(Exception reason) { | |
145 | 286 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { |
144 | 287 m_cancelationReason = reason; |
288 m_cancelRequest = CANCEL_REQUESTED; | |
289 if (m_cancelationHandlers != null) { | |
290 Action<Exception> handler; | |
291 while (m_cancelationHandlers.TryDequeue(out handler)) | |
292 handler(m_cancelationReason); | |
293 } | |
294 } | |
295 } | |
296 | |
297 #endregion | |
298 } | |
299 } | |
300 |