Mercurial > pub > ImplabNet
comparison Implab/AbstractEvent.cs @ 242:cbe10ac0731e v3
Working on promises
author | cin |
---|---|
date | Wed, 24 Jan 2018 03:03:21 +0300 |
parents | fa6cbf4d8841 |
children | b1e0ffdf3451 |
comparison
equal
deleted
inserted
replaced
240:fa6cbf4d8841 | 242:cbe10ac0731e |
---|---|
1 using System; | 1 using System; |
2 using Implab.Parallels; | 2 using Implab.Parallels; |
3 using System.Threading; | 3 using System.Threading; |
4 using System.Reflection; | 4 using System.Reflection; |
5 using System.Diagnostics; | |
5 | 6 |
6 namespace Implab { | 7 namespace Implab { |
7 public abstract class AbstractEvent<THandler> : ICancellable { | 8 public abstract class AbstractEvent<THandler> where THandler : class { |
8 | 9 |
9 const int UNRESOLVED_SATE = 0; | 10 const int PENDING_SATE = 0; |
10 const int TRANSITIONAL_STATE = 1; | 11 protected const int TRANSITIONAL_STATE = 1; |
12 | |
11 protected const int SUCCEEDED_STATE = 2; | 13 protected const int SUCCEEDED_STATE = 2; |
12 protected const int REJECTED_STATE = 3; | 14 protected const int REJECTED_STATE = 3; |
13 protected const int CANCELLED_STATE = 4; | |
14 | 15 |
15 const int CANCEL_NOT_REQUESTED = 0; | 16 volatile int m_state; |
16 const int CANCEL_REQUESTING = 1; | 17 Exception m_error; |
17 const int CANCEL_REQUESTED = 2; | |
18 | 18 |
19 const int RESERVED_HANDLERS_COUNT = 4; | 19 THandler m_handler; |
20 | |
21 int m_state; | |
22 Exception m_error; | |
23 int m_handlersCount; | |
24 | |
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | |
26 THandler[] m_handlers; | |
27 SimpleAsyncQueue<THandler> m_extraHandlers; | 20 SimpleAsyncQueue<THandler> m_extraHandlers; |
28 int m_handlerPointer = -1; | |
29 int m_handlersCommited; | |
30 | |
31 int m_cancelRequest; | |
32 Exception m_cancelationReason; | |
33 | 21 |
34 #region state managment | 22 #region state managment |
35 bool BeginTransit() { | 23 protected bool BeginTransit() { |
36 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | 24 return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); |
37 } | 25 } |
38 | 26 |
39 void CompleteTransit(int state) { | 27 protected void CompleteTransit(int state) { |
28 #if DEBUG | |
40 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | 29 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) |
41 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | 30 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); |
31 #else | |
32 m_state = state; | |
33 #endif | |
34 Signal(); | |
42 } | 35 } |
43 | 36 |
44 void WaitTransition() { | 37 protected void WaitTransition() { |
45 while (m_state == TRANSITIONAL_STATE) { | 38 if (m_state == TRANSITIONAL_STATE) { |
46 Thread.MemoryBarrier(); | 39 SpinWait spin; |
40 do { | |
41 spin.SpinOnce(); | |
42 } while (m_state == TRANSITIONAL_STATE); | |
47 } | 43 } |
48 } | 44 } |
49 | 45 |
50 protected bool BeginSetResult() { | 46 protected bool BeginSetResult() { |
51 if (!BeginTransit()) { | 47 if (!BeginTransit()) { |
52 WaitTransition(); | 48 WaitTransition(); |
53 if (m_state != CANCELLED_STATE) | |
54 throw new InvalidOperationException("The promise is already resolved"); | |
55 return false; | 49 return false; |
56 } | 50 } |
57 return true; | 51 return true; |
58 } | 52 } |
59 | 53 |
60 protected void EndSetResult() { | 54 protected void EndSetResult() { |
61 CompleteTransit(SUCCEEDED_STATE); | 55 CompleteTransit(SUCCEEDED_STATE); |
62 Signal(); | |
63 } | 56 } |
64 | 57 |
65 | 58 |
66 | 59 |
67 /// <summary> | 60 /// <summary> |
76 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | 69 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
77 protected void SetError(Exception error) { | 70 protected void SetError(Exception error) { |
78 if (BeginTransit()) { | 71 if (BeginTransit()) { |
79 m_error = error; | 72 m_error = error; |
80 CompleteTransit(REJECTED_STATE); | 73 CompleteTransit(REJECTED_STATE); |
81 | |
82 Signal(); | |
83 } else { | 74 } else { |
84 WaitTransition(); | 75 WaitTransition(); |
85 if (m_state == SUCCEEDED_STATE) | 76 if (m_state == SUCCEEDED_STATE) |
86 throw new InvalidOperationException("The promise is already resolved"); | 77 throw new InvalidOperationException("The promise is already resolved"); |
87 } | 78 } |
88 } | 79 } |
89 | 80 |
90 /// <summary> | |
91 /// Отменяет операцию, если это возможно. | |
92 /// </summary> | |
93 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
94 protected void SetCancelled(Exception reason) { | |
95 if (BeginTransit()) { | |
96 m_error = reason; | |
97 CompleteTransit(CANCELLED_STATE); | |
98 Signal(); | |
99 } | |
100 } | |
101 | |
102 protected abstract void SignalHandler(THandler handler, int signal); | 81 protected abstract void SignalHandler(THandler handler, int signal); |
103 | 82 |
104 void Signal() { | 83 void Signal() { |
105 var hp = m_handlerPointer; | 84 THandler handler; |
106 var slot = hp +1 ; | 85 while (TryDequeueHandler(out handler)) |
107 while (slot < m_handlersCommited) { | 86 SignalHandler(handler, m_state); |
108 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
109 SignalHandler(m_handlers[slot], m_state); | |
110 } | |
111 hp = m_handlerPointer; | |
112 slot = hp +1 ; | |
113 } | |
114 | |
115 | |
116 if (m_extraHandlers != null) { | |
117 THandler handler; | |
118 while (m_extraHandlers.TryDequeue(out handler)) | |
119 SignalHandler(handler, m_state); | |
120 } | |
121 } | 87 } |
122 | 88 |
123 #endregion | 89 #endregion |
124 | 90 |
125 protected abstract Signal GetResolveSignal(); | 91 protected abstract Signal GetFulfillSignal(); |
126 | 92 |
127 #region synchronization traits | 93 #region synchronization traits |
128 protected void WaitResult(int timeout) { | 94 protected void WaitResult(int timeout) { |
129 if (!(IsResolved || GetResolveSignal().Wait(timeout))) | 95 if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) |
130 throw new TimeoutException(); | 96 throw new TimeoutException(); |
131 | 97 |
132 switch (m_state) { | 98 if (IsRejected) |
133 case SUCCEEDED_STATE: | 99 Rethrow(); |
134 return; | 100 } |
135 case CANCELLED_STATE: | 101 |
136 throw new OperationCanceledException("The operation has been cancelled", m_error); | 102 protected void Rethrow() { |
137 case REJECTED_STATE: | 103 Debug.Assert(m_error != null); |
138 throw new TargetInvocationException(m_error); | 104 if (m_error is OperationCanceledException) |
139 default: | 105 throw new OperationCanceledException("Operation cancelled", m_error); |
140 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); | 106 else |
141 } | 107 throw new TargetInvocationException(m_error); |
142 } | 108 } |
143 #endregion | 109 #endregion |
144 | 110 |
145 #region handlers managment | 111 #region handlers managment |
146 | 112 |
148 | 114 |
149 if (m_state > 1) { | 115 if (m_state > 1) { |
150 // the promise is in the resolved state, just invoke the handler | 116 // the promise is in the resolved state, just invoke the handler |
151 SignalHandler(handler, m_state); | 117 SignalHandler(handler, m_state); |
152 } else { | 118 } else { |
153 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | 119 if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { |
120 if (m_extraHandlers == null) | |
121 // compare-exchange will fprotect from loosing already created queue | |
122 Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); | |
123 m_extraHandlers.Enqueue(handler); | |
124 } | |
154 | 125 |
155 if (slot < RESERVED_HANDLERS_COUNT) { | 126 if (m_state > 1 && TryDequeueHandler(out handler)) |
156 | |
157 if (slot == 0) { | |
158 m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | |
159 } else { | |
160 while (m_handlers == null) | |
161 Thread.MemoryBarrier(); | |
162 } | |
163 | |
164 m_handlers[slot] = handler; | |
165 | |
166 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
167 } | |
168 | |
169 if (m_state > 1) { | |
170 do { | |
171 var hp = m_handlerPointer; | |
172 slot = hp + 1; | |
173 if (slot < m_handlersCommited) { | |
174 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
175 continue; | |
176 SignalHandler(m_handlers[slot], m_state); | |
177 } | |
178 break; | |
179 } while(true); | |
180 } | |
181 } else { | |
182 if (slot == RESERVED_HANDLERS_COUNT) { | |
183 m_extraHandlers = new SimpleAsyncQueue<THandler>(); | |
184 } else { | |
185 while (m_extraHandlers == null) | |
186 Thread.MemoryBarrier(); | |
187 } | |
188 | |
189 m_extraHandlers.Enqueue(handler); | |
190 | |
191 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
192 // if the promise have been resolved while we was adding the handler to the queue | 127 // if the promise have been resolved while we was adding the handler to the queue |
193 // we can't guarantee that someone is still processing it | 128 // we can't guarantee that someone is still processing it |
194 // therefore we need to fetch a handler from the queue and execute it | 129 // therefore we need to fetch a handler from the queue and execute it |
195 // note that fetched handler may be not the one that we have added | 130 // note that fetched handler may be not the one that we have added |
196 // even we can fetch no handlers at all :) | 131 // even we can fetch no handlers at all :) |
197 SignalHandler(handler, m_state); | 132 SignalHandler(handler, m_state); |
198 } | |
199 } | 133 } |
134 | |
135 } | |
136 | |
137 bool TryDequeueHandler(out THandler handler) { | |
138 handler = Interlocked.Exchange(ref m_handler, null); | |
139 if (handler != null) | |
140 return true; | |
141 return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler); | |
200 } | 142 } |
201 | 143 |
202 #endregion | 144 #endregion |
203 | 145 |
204 #region IPromise implementation | 146 #region IPromise implementation |
205 | 147 |
206 public bool IsResolved { | 148 public bool IsFulfilled { |
207 get { | 149 get { |
208 Thread.MemoryBarrier(); | 150 return m_state > TRANSITIONAL_STATE; |
209 return m_state > 1; | |
210 } | 151 } |
211 } | 152 } |
212 | 153 |
213 public bool IsCancelled { | 154 public bool IsRejected { |
214 get { | 155 get { |
215 Thread.MemoryBarrier(); | 156 return m_state == REJECTED_STATE; |
216 return m_state == CANCELLED_STATE; | |
217 } | 157 } |
218 } | 158 } |
219 | 159 |
220 #endregion | 160 #endregion |
221 | 161 |
222 public Exception Error { | 162 public Exception RejectReason { |
223 get { | 163 get { |
224 return m_error; | 164 return m_error; |
225 } | 165 } |
226 } | 166 } |
227 | 167 |
228 public bool CancelOperationIfRequested() { | |
229 if (IsCancellationRequested) { | |
230 CancelOperation(CancellationReason); | |
231 return true; | |
232 } | |
233 return false; | |
234 } | |
235 | |
236 public virtual void CancelOperation(Exception reason) { | |
237 SetCancelled(reason); | |
238 } | |
239 | |
240 public void CancellationRequested(Action<Exception> handler) { | |
241 Safe.ArgumentNotNull(handler, "handler"); | |
242 if (IsCancellationRequested) | |
243 handler(CancellationReason); | |
244 | |
245 if (m_cancelationHandlers == null) | |
246 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null); | |
247 | |
248 m_cancelationHandlers.Enqueue(handler); | |
249 | |
250 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler)) | |
251 // TryDeque implies MemoryBarrier() | |
252 handler(m_cancelationReason); | |
253 } | |
254 | |
255 public bool IsCancellationRequested { | |
256 get { | |
257 do { | |
258 if (m_cancelRequest == CANCEL_NOT_REQUESTED) | |
259 return false; | |
260 if (m_cancelRequest == CANCEL_REQUESTED) | |
261 return true; | |
262 Thread.MemoryBarrier(); | |
263 } while(true); | |
264 } | |
265 } | |
266 | |
267 public Exception CancellationReason { | |
268 get { | |
269 do { | |
270 Thread.MemoryBarrier(); | |
271 } while(m_cancelRequest == CANCEL_REQUESTING); | |
272 | |
273 return m_cancelationReason; | |
274 } | |
275 } | |
276 | |
277 #region ICancellable implementation | |
278 | |
279 public void Cancel() { | |
280 Cancel(null); | |
281 } | |
282 | |
283 public void Cancel(Exception reason) { | |
284 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) { | |
285 m_cancelationReason = reason; | |
286 m_cancelRequest = CANCEL_REQUESTED; | |
287 if (m_cancelationHandlers != null) { | |
288 Action<Exception> handler; | |
289 while (m_cancelationHandlers.TryDequeue(out handler)) | |
290 handler(m_cancelationReason); | |
291 } | |
292 } | |
293 } | |
294 | |
295 #endregion | |
296 } | 168 } |
297 } | 169 } |
298 | 170 |