Mercurial > pub > ImplabNet
annotate Implab/AbstractPromise.cs @ 138:f75cfa58e3d4 v2
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
author | cin |
---|---|
date | Tue, 17 Feb 2015 18:16:26 +0300 |
parents | 671f60cd0250 |
children | 16f926ee499d |
rev | line source |
---|---|
119 | 1 using System; |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
7 public abstract class AbstractPromise<THandler> { | |
8 | |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
11 const int SUCCEEDED_STATE = 2; | |
12 const int REJECTED_STATE = 3; | |
13 const int CANCELLED_STATE = 4; | |
14 | |
125 | 15 const int RESERVED_HANDLERS_COUNT = 4; |
16 | |
119 | 17 int m_state; |
18 Exception m_error; | |
125 | 19 int m_handlersCount; |
119 | 20 |
125 | 21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; |
22 MTQueue<THandler> m_extraHandlers; | |
23 int m_handlerPointer = -1; | |
24 int m_handlersCommited; | |
119 | 25 |
26 #region state managment | |
27 bool BeginTransit() { | |
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
29 } | |
30 | |
31 void CompleteTransit(int state) { | |
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
34 } | |
35 | |
36 void WaitTransition() { | |
37 while (m_state == TRANSITIONAL_STATE) { | |
38 Thread.MemoryBarrier(); | |
39 } | |
40 } | |
41 | |
130
671f60cd0250
fixed Resove method bug when calling it on already cancelled promise
cin
parents:
125
diff
changeset
|
42 protected bool BeginSetResult() { |
119 | 43 if (!BeginTransit()) { |
44 WaitTransition(); | |
45 if (m_state != CANCELLED_STATE) | |
46 throw new InvalidOperationException("The promise is already resolved"); | |
130
671f60cd0250
fixed Resove method bug when calling it on already cancelled promise
cin
parents:
125
diff
changeset
|
47 return false; |
119 | 48 } |
130
671f60cd0250
fixed Resove method bug when calling it on already cancelled promise
cin
parents:
125
diff
changeset
|
49 return true; |
119 | 50 } |
51 | |
52 protected void EndSetResult() { | |
53 CompleteTransit(SUCCEEDED_STATE); | |
54 OnSuccess(); | |
55 } | |
56 | |
57 | |
58 | |
59 /// <summary> | |
60 /// Выполняет обещание, сообщая об ошибке | |
61 /// </summary> | |
62 /// <remarks> | |
63 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
64 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
65 /// будут проигнорированы. | |
66 /// </remarks> | |
67 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
68 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
69 protected void SetError(Exception error) { | |
70 if (BeginTransit()) { | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
71 if (error is OperationCanceledException) { |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
72 CompleteTransit(CANCELLED_STATE); |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
73 m_error = error.InnerException; |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
74 OnCancelled(); |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
75 } else { |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
76 m_error = error is PromiseTransientException ? error.InnerException : error; |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
77 CompleteTransit(REJECTED_STATE); |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
78 OnError(); |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
79 } |
119 | 80 } else { |
81 WaitTransition(); | |
82 if (m_state == SUCCEEDED_STATE) | |
83 throw new InvalidOperationException("The promise is already resolved"); | |
84 } | |
85 } | |
86 | |
87 /// <summary> | |
88 /// Отменяет операцию, если это возможно. | |
89 /// </summary> | |
90 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
91 protected void SetCancelled(Exception reason) { |
119 | 92 if (BeginTransit()) { |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
93 m_error = reason; |
119 | 94 CompleteTransit(CANCELLED_STATE); |
95 OnCancelled(); | |
96 } | |
97 } | |
98 | |
99 protected abstract void SignalSuccess(THandler handler); | |
100 | |
101 protected abstract void SignalError(THandler handler, Exception error); | |
102 | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
103 protected abstract void SignalCancelled(THandler handler, Exception reason); |
119 | 104 |
105 void OnSuccess() { | |
125 | 106 var hp = m_handlerPointer; |
107 var slot = hp +1 ; | |
108 while (slot < m_handlersCommited) { | |
109 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
110 SignalSuccess(m_handlers[slot]); | |
111 } | |
112 hp = m_handlerPointer; | |
113 slot = hp +1 ; | |
114 } | |
115 | |
116 | |
117 if (m_extraHandlers != null) { | |
118 THandler handler; | |
119 while (m_extraHandlers.TryDequeue(out handler)) | |
120 SignalSuccess(handler); | |
121 } | |
119 | 122 } |
123 | |
124 void OnError() { | |
125 | 125 var hp = m_handlerPointer; |
126 var slot = hp +1 ; | |
127 while (slot < m_handlersCommited) { | |
128 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
129 SignalError(m_handlers[slot],m_error); | |
130 } | |
131 hp = m_handlerPointer; | |
132 slot = hp +1 ; | |
133 } | |
134 | |
135 if (m_extraHandlers != null) { | |
136 THandler handler; | |
137 while (m_extraHandlers.TryDequeue(out handler)) | |
138 SignalError(handler, m_error); | |
139 } | |
119 | 140 } |
141 | |
142 void OnCancelled() { | |
125 | 143 var hp = m_handlerPointer; |
144 var slot = hp +1 ; | |
145 while (slot < m_handlersCommited) { | |
146 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
147 SignalCancelled(m_handlers[slot], m_error); |
125 | 148 } |
149 hp = m_handlerPointer; | |
150 slot = hp +1 ; | |
151 } | |
152 | |
153 if (m_extraHandlers != null) { | |
154 THandler handler; | |
155 while (m_extraHandlers.TryDequeue(out handler)) | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
156 SignalCancelled(handler, m_error); |
125 | 157 } |
119 | 158 } |
159 | |
160 #endregion | |
161 | |
162 protected abstract void Listen(PromiseEventType events, Action handler); | |
163 | |
164 #region synchronization traits | |
165 protected void WaitResult(int timeout) { | |
166 if (!IsResolved) { | |
167 var lk = new object(); | |
168 | |
169 Listen(PromiseEventType.All, () => { | |
170 lock(lk) { | |
171 Monitor.Pulse(lk); | |
172 } | |
173 }); | |
174 | |
175 lock (lk) { | |
176 while(!IsResolved) { | |
177 if(!Monitor.Wait(lk,timeout)) | |
178 throw new TimeoutException(); | |
179 } | |
180 } | |
181 | |
182 } | |
183 switch (m_state) { | |
184 case SUCCEEDED_STATE: | |
185 return; | |
186 case CANCELLED_STATE: | |
187 throw new OperationCanceledException(); | |
188 case REJECTED_STATE: | |
189 throw new TargetInvocationException(m_error); | |
190 default: | |
191 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | |
192 } | |
193 } | |
194 #endregion | |
195 | |
196 #region handlers managment | |
197 | |
198 protected void AddHandler(THandler handler) { | |
199 | |
125 | 200 if (m_state > 1) { |
119 | 201 // the promise is in the resolved state, just invoke the handler |
125 | 202 InvokeHandler(handler); |
203 } else { | |
204 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
205 | |
206 if (slot < RESERVED_HANDLERS_COUNT) { | |
207 m_handlers[slot] = handler; | |
208 | |
209 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
210 } | |
119 | 211 |
125 | 212 if (m_state > 1) { |
213 do { | |
214 var hp = m_handlerPointer; | |
215 slot = hp + 1; | |
216 if (slot < m_handlersCommited) { | |
217 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
218 continue; | |
219 InvokeHandler(m_handlers[slot]); | |
220 } | |
221 break; | |
222 } while(true); | |
223 } | |
224 } else { | |
225 if (slot == RESERVED_HANDLERS_COUNT) { | |
226 m_extraHandlers = new MTQueue<THandler>(); | |
227 } else { | |
228 while (m_extraHandlers == null) | |
229 Thread.MemoryBarrier(); | |
230 } | |
119 | 231 |
125 | 232 m_extraHandlers.Enqueue(handler); |
233 | |
234 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
119 | 235 // if the promise have been resolved while we was adding the handler to the queue |
236 // we can't guarantee that someone is still processing it | |
237 // therefore we need to fetch a handler from the queue and execute it | |
238 // note that fetched handler may be not the one that we have added | |
239 // even we can fetch no handlers at all :) | |
240 InvokeHandler(handler); | |
125 | 241 } |
119 | 242 } |
243 } | |
244 | |
245 protected void InvokeHandler(THandler handler) { | |
246 switch (m_state) { | |
247 case SUCCEEDED_STATE: | |
248 SignalSuccess(handler); | |
249 break; | |
250 case CANCELLED_STATE: | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
251 SignalCancelled(handler, m_error); |
119 | 252 break; |
253 case REJECTED_STATE: | |
254 SignalError(handler, m_error); | |
255 break; | |
256 default: | |
257 throw new Exception(String.Format("Invalid promise state {0}", m_state)); | |
258 } | |
259 } | |
260 | |
261 #endregion | |
262 | |
263 #region IPromise implementation | |
264 | |
265 public void Join(int timeout) { | |
266 WaitResult(timeout); | |
267 } | |
268 | |
269 public void Join() { | |
270 WaitResult(-1); | |
271 } | |
272 | |
273 public bool IsResolved { | |
274 get { | |
275 Thread.MemoryBarrier(); | |
276 return m_state > 1; | |
277 } | |
278 } | |
279 | |
280 public bool IsCancelled { | |
281 get { | |
282 Thread.MemoryBarrier(); | |
283 return m_state == CANCELLED_STATE; | |
284 } | |
285 } | |
286 | |
287 #endregion | |
288 | |
289 #region ICancellable implementation | |
290 | |
291 public void Cancel() { | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
292 SetCancelled(null); |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
293 } |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
294 |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
295 public void Cancel(Exception reason) { |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
296 SetCancelled(reason); |
119 | 297 } |
298 | |
299 #endregion | |
138
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
300 |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
301 public Exception Error { |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
302 get { |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
303 return m_error; |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
304 } |
f75cfa58e3d4
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
cin
parents:
130
diff
changeset
|
305 } |
119 | 306 } |
307 } | |
308 |