Mercurial > pub > ImplabNet
annotate Implab/AbstractPromise.cs @ 136:e9e7940c7d98 v2
shared locks + tests
author | cin |
---|---|
date | Mon, 16 Feb 2015 01:14:09 +0300 |
parents | 671f60cd0250 |
children | f75cfa58e3d4 |
rev | line source |
---|---|
119 | 1 using System; |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
7 public abstract class AbstractPromise<THandler> { | |
8 | |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
11 const int SUCCEEDED_STATE = 2; | |
12 const int REJECTED_STATE = 3; | |
13 const int CANCELLED_STATE = 4; | |
14 | |
125 | 15 const int RESERVED_HANDLERS_COUNT = 4; |
16 | |
119 | 17 int m_state; |
18 Exception m_error; | |
125 | 19 int m_handlersCount; |
119 | 20 |
125 | 21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; |
22 MTQueue<THandler> m_extraHandlers; | |
23 int m_handlerPointer = -1; | |
24 int m_handlersCommited; | |
119 | 25 |
26 #region state managment | |
27 bool BeginTransit() { | |
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
29 } | |
30 | |
31 void CompleteTransit(int state) { | |
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
34 } | |
35 | |
36 void WaitTransition() { | |
37 while (m_state == TRANSITIONAL_STATE) { | |
38 Thread.MemoryBarrier(); | |
39 } | |
40 } | |
41 | |
130
671f60cd0250
fixed Resove method bug when calling it on already cancelled promise
cin
parents:
125
diff
changeset
|
42 protected bool BeginSetResult() { |
119 | 43 if (!BeginTransit()) { |
44 WaitTransition(); | |
45 if (m_state != CANCELLED_STATE) | |
46 throw new InvalidOperationException("The promise is already resolved"); | |
130
671f60cd0250
fixed Resove method bug when calling it on already cancelled promise
cin
parents:
125
diff
changeset
|
47 return false; |
119 | 48 } |
130
671f60cd0250
fixed Resove method bug when calling it on already cancelled promise
cin
parents:
125
diff
changeset
|
49 return true; |
119 | 50 } |
51 | |
52 protected void EndSetResult() { | |
53 CompleteTransit(SUCCEEDED_STATE); | |
54 OnSuccess(); | |
55 } | |
56 | |
57 | |
58 | |
59 /// <summary> | |
60 /// Выполняет обещание, сообщая об ошибке | |
61 /// </summary> | |
62 /// <remarks> | |
63 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
64 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
65 /// будут проигнорированы. | |
66 /// </remarks> | |
67 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
68 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
69 protected void SetError(Exception error) { | |
70 if (BeginTransit()) { | |
71 m_error = error is PromiseTransientException ? error.InnerException : error; | |
72 CompleteTransit(REJECTED_STATE); | |
73 OnError(); | |
74 } else { | |
75 WaitTransition(); | |
76 if (m_state == SUCCEEDED_STATE) | |
77 throw new InvalidOperationException("The promise is already resolved"); | |
78 } | |
79 } | |
80 | |
81 /// <summary> | |
82 /// Отменяет операцию, если это возможно. | |
83 /// </summary> | |
84 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
85 protected void SetCancelled() { | |
86 if (BeginTransit()) { | |
87 CompleteTransit(CANCELLED_STATE); | |
88 OnCancelled(); | |
89 } | |
90 } | |
91 | |
92 protected abstract void SignalSuccess(THandler handler); | |
93 | |
94 protected abstract void SignalError(THandler handler, Exception error); | |
95 | |
96 protected abstract void SignalCancelled(THandler handler); | |
97 | |
98 void OnSuccess() { | |
125 | 99 var hp = m_handlerPointer; |
100 var slot = hp +1 ; | |
101 while (slot < m_handlersCommited) { | |
102 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
103 SignalSuccess(m_handlers[slot]); | |
104 } | |
105 hp = m_handlerPointer; | |
106 slot = hp +1 ; | |
107 } | |
108 | |
109 | |
110 if (m_extraHandlers != null) { | |
111 THandler handler; | |
112 while (m_extraHandlers.TryDequeue(out handler)) | |
113 SignalSuccess(handler); | |
114 } | |
119 | 115 } |
116 | |
117 void OnError() { | |
125 | 118 var hp = m_handlerPointer; |
119 var slot = hp +1 ; | |
120 while (slot < m_handlersCommited) { | |
121 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
122 SignalError(m_handlers[slot],m_error); | |
123 } | |
124 hp = m_handlerPointer; | |
125 slot = hp +1 ; | |
126 } | |
127 | |
128 if (m_extraHandlers != null) { | |
129 THandler handler; | |
130 while (m_extraHandlers.TryDequeue(out handler)) | |
131 SignalError(handler, m_error); | |
132 } | |
119 | 133 } |
134 | |
135 void OnCancelled() { | |
125 | 136 var hp = m_handlerPointer; |
137 var slot = hp +1 ; | |
138 while (slot < m_handlersCommited) { | |
139 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
140 SignalCancelled(m_handlers[slot]); | |
141 } | |
142 hp = m_handlerPointer; | |
143 slot = hp +1 ; | |
144 } | |
145 | |
146 if (m_extraHandlers != null) { | |
147 THandler handler; | |
148 while (m_extraHandlers.TryDequeue(out handler)) | |
149 SignalCancelled(handler); | |
150 } | |
119 | 151 } |
152 | |
153 #endregion | |
154 | |
155 protected abstract void Listen(PromiseEventType events, Action handler); | |
156 | |
157 #region synchronization traits | |
158 protected void WaitResult(int timeout) { | |
159 if (!IsResolved) { | |
160 var lk = new object(); | |
161 | |
162 Listen(PromiseEventType.All, () => { | |
163 lock(lk) { | |
164 Monitor.Pulse(lk); | |
165 } | |
166 }); | |
167 | |
168 lock (lk) { | |
169 while(!IsResolved) { | |
170 if(!Monitor.Wait(lk,timeout)) | |
171 throw new TimeoutException(); | |
172 } | |
173 } | |
174 | |
175 } | |
176 switch (m_state) { | |
177 case SUCCEEDED_STATE: | |
178 return; | |
179 case CANCELLED_STATE: | |
180 throw new OperationCanceledException(); | |
181 case REJECTED_STATE: | |
182 throw new TargetInvocationException(m_error); | |
183 default: | |
184 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | |
185 } | |
186 } | |
187 #endregion | |
188 | |
189 #region handlers managment | |
190 | |
191 protected void AddHandler(THandler handler) { | |
192 | |
125 | 193 if (m_state > 1) { |
119 | 194 // the promise is in the resolved state, just invoke the handler |
125 | 195 InvokeHandler(handler); |
196 } else { | |
197 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | |
198 | |
199 if (slot < RESERVED_HANDLERS_COUNT) { | |
200 m_handlers[slot] = handler; | |
201 | |
202 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | |
203 } | |
119 | 204 |
125 | 205 if (m_state > 1) { |
206 do { | |
207 var hp = m_handlerPointer; | |
208 slot = hp + 1; | |
209 if (slot < m_handlersCommited) { | |
210 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | |
211 continue; | |
212 InvokeHandler(m_handlers[slot]); | |
213 } | |
214 break; | |
215 } while(true); | |
216 } | |
217 } else { | |
218 if (slot == RESERVED_HANDLERS_COUNT) { | |
219 m_extraHandlers = new MTQueue<THandler>(); | |
220 } else { | |
221 while (m_extraHandlers == null) | |
222 Thread.MemoryBarrier(); | |
223 } | |
119 | 224 |
125 | 225 m_extraHandlers.Enqueue(handler); |
226 | |
227 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler)) | |
119 | 228 // if the promise have been resolved while we was adding the handler to the queue |
229 // we can't guarantee that someone is still processing it | |
230 // therefore we need to fetch a handler from the queue and execute it | |
231 // note that fetched handler may be not the one that we have added | |
232 // even we can fetch no handlers at all :) | |
233 InvokeHandler(handler); | |
125 | 234 } |
119 | 235 } |
236 } | |
237 | |
238 protected void InvokeHandler(THandler handler) { | |
239 switch (m_state) { | |
240 case SUCCEEDED_STATE: | |
241 SignalSuccess(handler); | |
242 break; | |
243 case CANCELLED_STATE: | |
244 SignalCancelled(handler); | |
245 break; | |
246 case REJECTED_STATE: | |
247 SignalError(handler, m_error); | |
248 break; | |
249 default: | |
250 throw new Exception(String.Format("Invalid promise state {0}", m_state)); | |
251 } | |
252 } | |
253 | |
254 #endregion | |
255 | |
256 #region IPromise implementation | |
257 | |
258 public void Join(int timeout) { | |
259 WaitResult(timeout); | |
260 } | |
261 | |
262 public void Join() { | |
263 WaitResult(-1); | |
264 } | |
265 | |
266 public bool IsResolved { | |
267 get { | |
268 Thread.MemoryBarrier(); | |
269 return m_state > 1; | |
270 } | |
271 } | |
272 | |
273 public bool IsCancelled { | |
274 get { | |
275 Thread.MemoryBarrier(); | |
276 return m_state == CANCELLED_STATE; | |
277 } | |
278 } | |
279 | |
280 #endregion | |
281 | |
282 #region ICancellable implementation | |
283 | |
284 public void Cancel() { | |
285 SetCancelled(); | |
286 } | |
287 | |
288 #endregion | |
289 } | |
290 } | |
291 |