Mercurial > pub > ImplabNet
comparison Implab/AbstractPromise.cs @ 144:8c0b95069066 v2
DRAFT: refactoring
author | cin |
---|---|
date | Fri, 06 Mar 2015 15:45:26 +0300 |
parents | 16f926ee499d |
children | 706fccb85524 |
comparison
equal
deleted
inserted
replaced
143:16f926ee499d | 144:8c0b95069066 |
---|---|
1 using System; | 1 using System; |
2 using Implab.Parallels; | 2 using Implab.Parallels; |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | 3 |
6 namespace Implab { | 4 namespace Implab { |
7 public abstract class AbstractPromise<THandler> { | 5 public abstract class AbstractPromise : AbstractEvent<AbstractPromise.HandlerDescriptor>, IPromise { |
6 public struct HandlerDescriptor { | |
7 readonly Action m_handler; | |
8 readonly Action<Exception> m_error; | |
9 readonly Action<Exception> m_cancel; | |
10 readonly PromiseEventType m_mask; | |
8 | 11 |
9 const int UNRESOLVED_SATE = 0; | 12 public HandlerDescriptor(Action success, Action<Exception> error, Action<Exception> cancel) { |
10 const int TRANSITIONAL_STATE = 1; | 13 m_handler = success; |
11 const int SUCCEEDED_STATE = 2; | 14 m_error = error; |
12 const int REJECTED_STATE = 3; | 15 m_cancel = cancel; |
13 const int CANCELLED_STATE = 4; | 16 m_mask = PromiseEventType.Success; |
17 } | |
14 | 18 |
15 const int RESERVED_HANDLERS_COUNT = 4; | 19 public HandlerDescriptor(Action handler, PromiseEventType mask) { |
20 m_handler = handler; | |
21 m_mask = mask; | |
22 } | |
16 | 23 |
17 int m_state; | 24 public void SignalSuccess() { |
18 Exception m_error; | 25 if (m_mask & PromiseEventType.Success && m_handler != null) { |
19 int m_handlersCount; | 26 try { |
27 m_handler(); | |
28 } catch (Exception err) { | |
29 // avoid calling handler twice in case of error | |
30 if (m_error != null) | |
31 SignalError(err); | |
32 } | |
33 } | |
34 } | |
20 | 35 |
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | 36 public void SignalError(Exception err) { |
22 MTQueue<THandler> m_extraHandlers; | 37 if (m_error != null) { |
23 int m_handlerPointer = -1; | 38 try { |
24 int m_handlersCommited; | 39 m_error(err); |
40 // Analysis disable once EmptyGeneralCatchClause | |
41 } catch { | |
42 } | |
43 } else if (m_mask & PromiseEventType.Error && m_handler != null) { | |
44 try { | |
45 m_handler(); | |
46 // Analysis disable once EmptyGeneralCatchClause | |
47 } catch { | |
48 } | |
49 } | |
50 } | |
25 | 51 |
26 #region state managment | 52 public void SignalCancel(Exception reason) { |
27 bool BeginTransit() { | 53 if (m_cancel != null) { |
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | 54 try { |
29 } | 55 m_cancel(reason); |
30 | 56 } catch (Exception err) { |
31 void CompleteTransit(int state) { | 57 SignalError(err); |
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | 58 } |
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | 59 } else if (m_mask & PromiseEventType.Cancelled && m_handler != null) { |
34 } | 60 try { |
35 | 61 m_handler(); |
36 void WaitTransition() { | 62 // Analysis disable once EmptyGeneralCatchClause |
37 while (m_state == TRANSITIONAL_STATE) { | 63 } catch { |
38 Thread.MemoryBarrier(); | 64 } |
65 } | |
39 } | 66 } |
40 } | 67 } |
41 | 68 |
42 protected bool BeginSetResult() { | 69 |
43 if (!BeginTransit()) { | 70 #region implemented abstract members of AbstractPromise |
44 WaitTransition(); | 71 |
45 if (m_state != CANCELLED_STATE) | 72 protected override void SignalSuccess(HandlerDescriptor handler) { |
46 throw new InvalidOperationException("The promise is already resolved"); | 73 handler.SignalSuccess(); |
47 return false; | |
48 } | |
49 return true; | |
50 } | 74 } |
51 | 75 |
52 protected void EndSetResult() { | 76 protected override void SignalError(HandlerDescriptor handler, Exception error) { |
53 CompleteTransit(SUCCEEDED_STATE); | 77 handler.SignalError(error); |
54 OnSuccess(); | |
55 } | 78 } |
56 | 79 |
57 | 80 protected override void SignalCancelled(HandlerDescriptor handler, Exception reason) { |
58 | 81 handler.SignalCancel(reason); |
59 /// <summary> | |
60 /// Выполняет обещание, сообщая об ошибке | |
61 /// </summary> | |
62 /// <remarks> | |
63 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
64 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
65 /// будут проигнорированы. | |
66 /// </remarks> | |
67 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
68 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
69 protected void SetError(Exception error) { | |
70 if (BeginTransit()) { | |
71 if (error is OperationCanceledException) { | |
72 CompleteTransit(CANCELLED_STATE); | |
73 m_error = error.InnerException; | |
74 OnCancelled(); | |
75 } else { | |
76 m_error = error is PromiseTransientException ? error.InnerException : error; | |
77 CompleteTransit(REJECTED_STATE); | |
78 OnError(); | |
79 } | |
80 } else { | |
81 WaitTransition(); | |
82 if (m_state == SUCCEEDED_STATE) | |
83 throw new InvalidOperationException("The promise is already resolved"); | |
84 } | |
85 } | 82 } |
86 | 83 |
87 /// <summary> | 84 protected override Signal GetResolveSignal() { |
88 /// Отменяет операцию, если это возможно. | 85 var signal = new Signal(); |
89 /// </summary> | 86 On(signal.Set, PromiseEventType.All); |
90 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
91 protected void SetCancelled(Exception reason) { | |
92 if (BeginTransit()) { | |
93 m_error = reason; | |
94 CompleteTransit(CANCELLED_STATE); | |
95 OnCancelled(); | |
96 } | |
97 } | |
98 | |
99 protected abstract void SignalSuccess(THandler handler); | |
100 | |
101 protected abstract void SignalError(THandler handler, Exception error); | |
102 | |
103 protected abstract void SignalCancelled(THandler handler, Exception reason); | |
104 | |
105 void OnSuccess() { | |
106 var hp = m_handlerPointer; | |
107 var slot = hp +1 ; | |
108 while (slot < m_handlersCommited) { | |
109 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
110 SignalSuccess(m_handlers[slot]); | |
111 } | |
112 hp = m_handlerPointer; | |
113 slot = hp +1 ; | |
114 } | |
115 | |
116 | |
117 if (m_extraHandlers != null) { | |
118 THandler handler; | |
119 while (m_extraHandlers.TryDequeue(out handler)) | |
120 SignalSuccess(handler); | |
121 } | |
122 } | |
123 | |
124 void OnError() { | |
125 var hp = m_handlerPointer; | |
126 var slot = hp +1 ; | |
127 while (slot < m_handlersCommited) { | |
128 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
129 SignalError(m_handlers[slot],m_error); | |
130 } | |
131 hp = m_handlerPointer; | |
132 slot = hp +1 ; | |
133 } | |
134 | |
135 if (m_extraHandlers != null) { | |
136 THandler handler; | |
137 while (m_extraHandlers.TryDequeue(out handler)) | |
138 SignalError(handler, m_error); | |
139 } | |
140 } | |
141 | |
142 void OnCancelled() { | |
143 var hp = m_handlerPointer; | |
144 var slot = hp +1 ; | |
145 while (slot < m_handlersCommited) { | |
146 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
147 SignalCancelled(m_handlers[slot], m_error); | |
148 } | |
149 hp = m_handlerPointer; | |
150 slot = hp +1 ; | |
151 } | |
152 | |
153 if (m_extraHandlers != null) { | |
154 THandler handler; | |
155 while (m_extraHandlers.TryDequeue(out handler)) | |
156 SignalCancelled(handler, m_error); | |
157 } | |
158 } | 87 } |
159 | 88 |
160 #endregion | 89 #endregion |
161 | 90 |
162 protected abstract void Listen(PromiseEventType events, Action handler); | |
163 | 91 |
164 #region synchronization traits | 92 public Type PromiseType { |
165 protected void WaitResult(int timeout) { | 93 get { |
166 if (!IsResolved) { | 94 return typeof(void); |
167 var lk = new object(); | |
168 | |
169 Listen(PromiseEventType.All, () => { | |
170 lock(lk) { | |
171 Monitor.Pulse(lk); | |
172 } | |
173 }); | |
174 | |
175 lock (lk) { | |
176 while(!IsResolved) { | |
177 if(!Monitor.Wait(lk,timeout)) | |
178 throw new TimeoutException(); | |
179 } | |
180 } | |
181 | |
182 } | |
183 switch (m_state) { | |
184 case SUCCEEDED_STATE: | |
185 return; | |
186 case CANCELLED_STATE: | |
187 throw new OperationCanceledException(); | |
188 case REJECTED_STATE: | |
189 throw new TargetInvocationException(m_error); | |
190 default: | |
191 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | |
192 } | |
193 } | |
194 #endregion | |
195 | |
196 #region handlers managment | |
197 | |
198 protected void AddHandler(THandler handler) { | |
199 | |
200 if (m_state > 1) { | |
201 // the promise is in the resolved state, just invoke the handler | |
202 InvokeHandler(handler); | |
203 } else { | |
204 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
205 | |
206 if (slot < RESERVED_HANDLERS_COUNT) { | |
207 m_handlers[slot] = handler; | |
208 | |
209 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
210 } | |
211 | |
212 if (m_state > 1) { | |
213 do { | |
214 var hp = m_handlerPointer; | |
215 slot = hp + 1; | |
216 if (slot < m_handlersCommited) { | |
217 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
218 continue; | |
219 InvokeHandler(m_handlers[slot]); | |
220 } | |
221 break; | |
222 } while(true); | |
223 } | |
224 } else { | |
225 if (slot == RESERVED_HANDLERS_COUNT) { | |
226 m_extraHandlers = new MTQueue<THandler>(); | |
227 } else { | |
228 while (m_extraHandlers == null) | |
229 Thread.MemoryBarrier(); | |
230 } | |
231 | |
232 m_extraHandlers.Enqueue(handler); | |
233 | |
234 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
235 // if the promise have been resolved while we was adding the handler to the queue | |
236 // we can't guarantee that someone is still processing it | |
237 // therefore we need to fetch a handler from the queue and execute it | |
238 // note that fetched handler may be not the one that we have added | |
239 // even we can fetch no handlers at all :) | |
240 InvokeHandler(handler); | |
241 } | |
242 } | 95 } |
243 } | 96 } |
244 | 97 |
245 protected void InvokeHandler(THandler handler) { | 98 public IPromise On(Action success, Action<Exception> error, Action<Exception> cancel) { |
246 switch (m_state) { | 99 AddHandler(new HandlerDescriptor(success, error, cancel)); |
247 case SUCCEEDED_STATE: | 100 return this; |
248 SignalSuccess(handler); | |
249 break; | |
250 case CANCELLED_STATE: | |
251 SignalCancelled(handler, m_error); | |
252 break; | |
253 case REJECTED_STATE: | |
254 SignalError(handler, m_error); | |
255 break; | |
256 default: | |
257 throw new Exception(String.Format("Invalid promise state {0}", m_state)); | |
258 } | |
259 } | 101 } |
260 | 102 |
261 #endregion | 103 public IPromise On(Action success, Action<Exception> error) { |
104 AddHandler(new HandlerDescriptor(success, error, null)); | |
105 return this; | |
106 } | |
262 | 107 |
263 #region IPromise implementation | 108 public IPromise On(Action success) { |
109 AddHandler(new HandlerDescriptor(success, null, null)); | |
110 return this; | |
111 } | |
264 | 112 |
265 public void Join(int timeout) { | 113 public IPromise On(Action handler, PromiseEventType events) { |
266 WaitResult(timeout); | 114 AddHandler(new HandlerDescriptor(handler,events)); |
115 return this; | |
116 } | |
117 | |
118 public IPromise<T> Cast<T>() { | |
119 throw new InvalidCastException(); | |
267 } | 120 } |
268 | 121 |
269 public void Join() { | 122 public void Join() { |
270 WaitResult(-1); | 123 WaitResult(-1); |
271 } | 124 } |
272 | 125 |
273 public bool IsResolved { | 126 public void Join(int timeout) { |
274 get { | 127 WaitResult(timeout); |
275 Thread.MemoryBarrier(); | |
276 return m_state > 1; | |
277 } | |
278 } | 128 } |
279 | 129 |
280 public bool IsCancelled { | 130 protected void SetResult() { |
281 get { | 131 BeginSetResult(); |
282 Thread.MemoryBarrier(); | 132 EndSetResult(); |
283 return m_state == CANCELLED_STATE; | |
284 } | |
285 } | |
286 | |
287 #endregion | |
288 | |
289 public Exception Error { | |
290 get { | |
291 return m_error; | |
292 } | |
293 } | 133 } |
294 } | 134 } |
295 } | 135 } |
296 | 136 |