comparison Implab/AbstractEvent.cs @ 242:cbe10ac0731e v3

Working on promises
author cin
date Wed, 24 Jan 2018 03:03:21 +0300
parents fa6cbf4d8841
children b1e0ffdf3451
comparison
equal deleted inserted replaced
240:fa6cbf4d8841 242:cbe10ac0731e
1 using System; 1 using System;
2 using Implab.Parallels; 2 using Implab.Parallels;
3 using System.Threading; 3 using System.Threading;
4 using System.Reflection; 4 using System.Reflection;
5 using System.Diagnostics;
5 6
6 namespace Implab { 7 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancellable { 8 public abstract class AbstractEvent<THandler> where THandler : class {
8 9
9 const int UNRESOLVED_SATE = 0; 10 const int PENDING_SATE = 0;
10 const int TRANSITIONAL_STATE = 1; 11 protected const int TRANSITIONAL_STATE = 1;
12
11 protected const int SUCCEEDED_STATE = 2; 13 protected const int SUCCEEDED_STATE = 2;
12 protected const int REJECTED_STATE = 3; 14 protected const int REJECTED_STATE = 3;
13 protected const int CANCELLED_STATE = 4;
14 15
15 const int CANCEL_NOT_REQUESTED = 0; 16 volatile int m_state;
16 const int CANCEL_REQUESTING = 1; 17 Exception m_error;
17 const int CANCEL_REQUESTED = 2;
18 18
19 const int RESERVED_HANDLERS_COUNT = 4; 19 THandler m_handler;
20
21 int m_state;
22 Exception m_error;
23 int m_handlersCount;
24
25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 THandler[] m_handlers;
27 SimpleAsyncQueue<THandler> m_extraHandlers; 20 SimpleAsyncQueue<THandler> m_extraHandlers;
28 int m_handlerPointer = -1;
29 int m_handlersCommited;
30
31 int m_cancelRequest;
32 Exception m_cancelationReason;
33 21
34 #region state managment 22 #region state managment
35 bool BeginTransit() { 23 protected bool BeginTransit() {
36 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); 24 return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE);
37 } 25 }
38 26
39 void CompleteTransit(int state) { 27 protected void CompleteTransit(int state) {
28 #if DEBUG
40 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) 29 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
41 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); 30 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
31 #else
32 m_state = state;
33 #endif
34 Signal();
42 } 35 }
43 36
44 void WaitTransition() { 37 protected void WaitTransition() {
45 while (m_state == TRANSITIONAL_STATE) { 38 if (m_state == TRANSITIONAL_STATE) {
46 Thread.MemoryBarrier(); 39 SpinWait spin;
40 do {
41 spin.SpinOnce();
42 } while (m_state == TRANSITIONAL_STATE);
47 } 43 }
48 } 44 }
49 45
50 protected bool BeginSetResult() { 46 protected bool BeginSetResult() {
51 if (!BeginTransit()) { 47 if (!BeginTransit()) {
52 WaitTransition(); 48 WaitTransition();
53 if (m_state != CANCELLED_STATE)
54 throw new InvalidOperationException("The promise is already resolved");
55 return false; 49 return false;
56 } 50 }
57 return true; 51 return true;
58 } 52 }
59 53
60 protected void EndSetResult() { 54 protected void EndSetResult() {
61 CompleteTransit(SUCCEEDED_STATE); 55 CompleteTransit(SUCCEEDED_STATE);
62 Signal();
63 } 56 }
64 57
65 58
66 59
67 /// <summary> 60 /// <summary>
76 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> 69 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
77 protected void SetError(Exception error) { 70 protected void SetError(Exception error) {
78 if (BeginTransit()) { 71 if (BeginTransit()) {
79 m_error = error; 72 m_error = error;
80 CompleteTransit(REJECTED_STATE); 73 CompleteTransit(REJECTED_STATE);
81
82 Signal();
83 } else { 74 } else {
84 WaitTransition(); 75 WaitTransition();
85 if (m_state == SUCCEEDED_STATE) 76 if (m_state == SUCCEEDED_STATE)
86 throw new InvalidOperationException("The promise is already resolved"); 77 throw new InvalidOperationException("The promise is already resolved");
87 } 78 }
88 } 79 }
89 80
90 /// <summary>
91 /// Отменяет операцию, если это возможно.
92 /// </summary>
93 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
94 protected void SetCancelled(Exception reason) {
95 if (BeginTransit()) {
96 m_error = reason;
97 CompleteTransit(CANCELLED_STATE);
98 Signal();
99 }
100 }
101
102 protected abstract void SignalHandler(THandler handler, int signal); 81 protected abstract void SignalHandler(THandler handler, int signal);
103 82
104 void Signal() { 83 void Signal() {
105 var hp = m_handlerPointer; 84 THandler handler;
106 var slot = hp +1 ; 85 while (TryDequeueHandler(out handler))
107 while (slot < m_handlersCommited) { 86 SignalHandler(handler, m_state);
108 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
109 SignalHandler(m_handlers[slot], m_state);
110 }
111 hp = m_handlerPointer;
112 slot = hp +1 ;
113 }
114
115
116 if (m_extraHandlers != null) {
117 THandler handler;
118 while (m_extraHandlers.TryDequeue(out handler))
119 SignalHandler(handler, m_state);
120 }
121 } 87 }
122 88
123 #endregion 89 #endregion
124 90
125 protected abstract Signal GetResolveSignal(); 91 protected abstract Signal GetFulfillSignal();
126 92
127 #region synchronization traits 93 #region synchronization traits
128 protected void WaitResult(int timeout) { 94 protected void WaitResult(int timeout) {
129 if (!(IsResolved || GetResolveSignal().Wait(timeout))) 95 if (!(IsFulfilled || GetFulfillSignal().Wait(timeout)))
130 throw new TimeoutException(); 96 throw new TimeoutException();
131 97
132 switch (m_state) { 98 if (IsRejected)
133 case SUCCEEDED_STATE: 99 Rethrow();
134 return; 100 }
135 case CANCELLED_STATE: 101
136 throw new OperationCanceledException("The operation has been cancelled", m_error); 102 protected void Rethrow() {
137 case REJECTED_STATE: 103 Debug.Assert(m_error != null);
138 throw new TargetInvocationException(m_error); 104 if (m_error is OperationCanceledException)
139 default: 105 throw new OperationCanceledException("Operation cancelled", m_error);
140 throw new ApplicationException(String.Format("The promise state {0} is invalid", m_state)); 106 else
141 } 107 throw new TargetInvocationException(m_error);
142 } 108 }
143 #endregion 109 #endregion
144 110
145 #region handlers managment 111 #region handlers managment
146 112
148 114
149 if (m_state > 1) { 115 if (m_state > 1) {
150 // the promise is in the resolved state, just invoke the handler 116 // the promise is in the resolved state, just invoke the handler
151 SignalHandler(handler, m_state); 117 SignalHandler(handler, m_state);
152 } else { 118 } else {
153 var slot = Interlocked.Increment(ref m_handlersCount) - 1; 119 if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) {
120 if (m_extraHandlers == null)
121 // compare-exchange will fprotect from loosing already created queue
122 Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null);
123 m_extraHandlers.Enqueue(handler);
124 }
154 125
155 if (slot < RESERVED_HANDLERS_COUNT) { 126 if (m_state > 1 && TryDequeueHandler(out handler))
156
157 if (slot == 0) {
158 m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
159 } else {
160 while (m_handlers == null)
161 Thread.MemoryBarrier();
162 }
163
164 m_handlers[slot] = handler;
165
166 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
167 }
168
169 if (m_state > 1) {
170 do {
171 var hp = m_handlerPointer;
172 slot = hp + 1;
173 if (slot < m_handlersCommited) {
174 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
175 continue;
176 SignalHandler(m_handlers[slot], m_state);
177 }
178 break;
179 } while(true);
180 }
181 } else {
182 if (slot == RESERVED_HANDLERS_COUNT) {
183 m_extraHandlers = new SimpleAsyncQueue<THandler>();
184 } else {
185 while (m_extraHandlers == null)
186 Thread.MemoryBarrier();
187 }
188
189 m_extraHandlers.Enqueue(handler);
190
191 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
192 // if the promise have been resolved while we was adding the handler to the queue 127 // if the promise have been resolved while we was adding the handler to the queue
193 // we can't guarantee that someone is still processing it 128 // we can't guarantee that someone is still processing it
194 // therefore we need to fetch a handler from the queue and execute it 129 // therefore we need to fetch a handler from the queue and execute it
195 // note that fetched handler may be not the one that we have added 130 // note that fetched handler may be not the one that we have added
196 // even we can fetch no handlers at all :) 131 // even we can fetch no handlers at all :)
197 SignalHandler(handler, m_state); 132 SignalHandler(handler, m_state);
198 }
199 } 133 }
134
135 }
136
137 bool TryDequeueHandler(out THandler handler) {
138 handler = Interlocked.Exchange(ref m_handler, null);
139 if (handler != null)
140 return true;
141 return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler);
200 } 142 }
201 143
202 #endregion 144 #endregion
203 145
204 #region IPromise implementation 146 #region IPromise implementation
205 147
206 public bool IsResolved { 148 public bool IsFulfilled {
207 get { 149 get {
208 Thread.MemoryBarrier(); 150 return m_state > TRANSITIONAL_STATE;
209 return m_state > 1;
210 } 151 }
211 } 152 }
212 153
213 public bool IsCancelled { 154 public bool IsRejected {
214 get { 155 get {
215 Thread.MemoryBarrier(); 156 return m_state == REJECTED_STATE;
216 return m_state == CANCELLED_STATE;
217 } 157 }
218 } 158 }
219 159
220 #endregion 160 #endregion
221 161
222 public Exception Error { 162 public Exception RejectReason {
223 get { 163 get {
224 return m_error; 164 return m_error;
225 } 165 }
226 } 166 }
227 167
228 public bool CancelOperationIfRequested() {
229 if (IsCancellationRequested) {
230 CancelOperation(CancellationReason);
231 return true;
232 }
233 return false;
234 }
235
236 public virtual void CancelOperation(Exception reason) {
237 SetCancelled(reason);
238 }
239
240 public void CancellationRequested(Action<Exception> handler) {
241 Safe.ArgumentNotNull(handler, "handler");
242 if (IsCancellationRequested)
243 handler(CancellationReason);
244
245 if (m_cancelationHandlers == null)
246 Interlocked.CompareExchange(ref m_cancelationHandlers, new SimpleAsyncQueue<Action<Exception>>(), null);
247
248 m_cancelationHandlers.Enqueue(handler);
249
250 if (IsCancellationRequested && m_cancelationHandlers.TryDequeue(out handler))
251 // TryDeque implies MemoryBarrier()
252 handler(m_cancelationReason);
253 }
254
255 public bool IsCancellationRequested {
256 get {
257 do {
258 if (m_cancelRequest == CANCEL_NOT_REQUESTED)
259 return false;
260 if (m_cancelRequest == CANCEL_REQUESTED)
261 return true;
262 Thread.MemoryBarrier();
263 } while(true);
264 }
265 }
266
267 public Exception CancellationReason {
268 get {
269 do {
270 Thread.MemoryBarrier();
271 } while(m_cancelRequest == CANCEL_REQUESTING);
272
273 return m_cancelationReason;
274 }
275 }
276
277 #region ICancellable implementation
278
279 public void Cancel() {
280 Cancel(null);
281 }
282
283 public void Cancel(Exception reason) {
284 if (CANCEL_NOT_REQUESTED == Interlocked.CompareExchange(ref m_cancelRequest, CANCEL_REQUESTING, CANCEL_NOT_REQUESTED)) {
285 m_cancelationReason = reason;
286 m_cancelRequest = CANCEL_REQUESTED;
287 if (m_cancelationHandlers != null) {
288 Action<Exception> handler;
289 while (m_cancelationHandlers.TryDequeue(out handler))
290 handler(m_cancelationReason);
291 }
292 }
293 }
294
295 #endregion
296 } 168 }
297 } 169 }
298 170