119
|
1 using System;
|
|
2 using Implab.Parallels;
|
|
3 using System.Threading;
|
|
4 using System.Reflection;
|
|
5
|
|
6 namespace Implab {
|
|
7 public abstract class AbstractPromise<THandler> {
|
|
8
|
|
9 const int UNRESOLVED_SATE = 0;
|
|
10 const int TRANSITIONAL_STATE = 1;
|
|
11 const int SUCCEEDED_STATE = 2;
|
|
12 const int REJECTED_STATE = 3;
|
|
13 const int CANCELLED_STATE = 4;
|
|
14
|
125
|
15 const int RESERVED_HANDLERS_COUNT = 4;
|
|
16
|
119
|
17 int m_state;
|
|
18 Exception m_error;
|
125
|
19 int m_handlersCount;
|
119
|
20
|
125
|
21 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
|
|
22 MTQueue<THandler> m_extraHandlers;
|
|
23 int m_handlerPointer = -1;
|
|
24 int m_handlersCommited;
|
119
|
25
|
|
26 #region state managment
|
|
27 bool BeginTransit() {
|
|
28 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
|
|
29 }
|
|
30
|
|
31 void CompleteTransit(int state) {
|
|
32 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
|
|
33 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
|
|
34 }
|
|
35
|
|
36 void WaitTransition() {
|
|
37 while (m_state == TRANSITIONAL_STATE) {
|
|
38 Thread.MemoryBarrier();
|
|
39 }
|
|
40 }
|
|
41
|
|
42 protected void BeginSetResult() {
|
|
43 if (!BeginTransit()) {
|
|
44 WaitTransition();
|
|
45 if (m_state != CANCELLED_STATE)
|
|
46 throw new InvalidOperationException("The promise is already resolved");
|
|
47 }
|
|
48 }
|
|
49
|
|
50 protected void EndSetResult() {
|
|
51 CompleteTransit(SUCCEEDED_STATE);
|
|
52 OnSuccess();
|
|
53 }
|
|
54
|
|
55
|
|
56
|
|
57 /// <summary>
|
|
58 /// Выполняет обещание, сообщая об ошибке
|
|
59 /// </summary>
|
|
60 /// <remarks>
|
|
61 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
|
|
62 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
|
|
63 /// будут проигнорированы.
|
|
64 /// </remarks>
|
|
65 /// <param name="error">Исключение возникшее при выполнении операции</param>
|
|
66 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
|
|
67 protected void SetError(Exception error) {
|
|
68 if (BeginTransit()) {
|
|
69 m_error = error is PromiseTransientException ? error.InnerException : error;
|
|
70 CompleteTransit(REJECTED_STATE);
|
|
71 OnError();
|
|
72 } else {
|
|
73 WaitTransition();
|
|
74 if (m_state == SUCCEEDED_STATE)
|
|
75 throw new InvalidOperationException("The promise is already resolved");
|
|
76 }
|
|
77 }
|
|
78
|
|
79 /// <summary>
|
|
80 /// Отменяет операцию, если это возможно.
|
|
81 /// </summary>
|
|
82 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
|
|
83 protected void SetCancelled() {
|
|
84 if (BeginTransit()) {
|
|
85 CompleteTransit(CANCELLED_STATE);
|
|
86 OnCancelled();
|
|
87 }
|
|
88 }
|
|
89
|
|
90 protected abstract void SignalSuccess(THandler handler);
|
|
91
|
|
92 protected abstract void SignalError(THandler handler, Exception error);
|
|
93
|
|
94 protected abstract void SignalCancelled(THandler handler);
|
|
95
|
|
96 void OnSuccess() {
|
125
|
97 var hp = m_handlerPointer;
|
|
98 var slot = hp +1 ;
|
|
99 while (slot < m_handlersCommited) {
|
|
100 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
|
|
101 SignalSuccess(m_handlers[slot]);
|
|
102 }
|
|
103 hp = m_handlerPointer;
|
|
104 slot = hp +1 ;
|
|
105 }
|
|
106
|
|
107
|
|
108 if (m_extraHandlers != null) {
|
|
109 THandler handler;
|
|
110 while (m_extraHandlers.TryDequeue(out handler))
|
|
111 SignalSuccess(handler);
|
|
112 }
|
119
|
113 }
|
|
114
|
|
115 void OnError() {
|
125
|
116 var hp = m_handlerPointer;
|
|
117 var slot = hp +1 ;
|
|
118 while (slot < m_handlersCommited) {
|
|
119 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
|
|
120 SignalError(m_handlers[slot],m_error);
|
|
121 }
|
|
122 hp = m_handlerPointer;
|
|
123 slot = hp +1 ;
|
|
124 }
|
|
125
|
|
126 if (m_extraHandlers != null) {
|
|
127 THandler handler;
|
|
128 while (m_extraHandlers.TryDequeue(out handler))
|
|
129 SignalError(handler, m_error);
|
|
130 }
|
119
|
131 }
|
|
132
|
|
133 void OnCancelled() {
|
125
|
134 var hp = m_handlerPointer;
|
|
135 var slot = hp +1 ;
|
|
136 while (slot < m_handlersCommited) {
|
|
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
|
|
138 SignalCancelled(m_handlers[slot]);
|
|
139 }
|
|
140 hp = m_handlerPointer;
|
|
141 slot = hp +1 ;
|
|
142 }
|
|
143
|
|
144 if (m_extraHandlers != null) {
|
|
145 THandler handler;
|
|
146 while (m_extraHandlers.TryDequeue(out handler))
|
|
147 SignalCancelled(handler);
|
|
148 }
|
119
|
149 }
|
|
150
|
|
151 #endregion
|
|
152
|
|
153 protected abstract void Listen(PromiseEventType events, Action handler);
|
|
154
|
|
155 #region synchronization traits
|
|
156 protected void WaitResult(int timeout) {
|
|
157 if (!IsResolved) {
|
|
158 var lk = new object();
|
|
159
|
|
160 Listen(PromiseEventType.All, () => {
|
|
161 lock(lk) {
|
|
162 Monitor.Pulse(lk);
|
|
163 }
|
|
164 });
|
|
165
|
|
166 lock (lk) {
|
|
167 while(!IsResolved) {
|
|
168 if(!Monitor.Wait(lk,timeout))
|
|
169 throw new TimeoutException();
|
|
170 }
|
|
171 }
|
|
172
|
|
173 }
|
|
174 switch (m_state) {
|
|
175 case SUCCEEDED_STATE:
|
|
176 return;
|
|
177 case CANCELLED_STATE:
|
|
178 throw new OperationCanceledException();
|
|
179 case REJECTED_STATE:
|
|
180 throw new TargetInvocationException(m_error);
|
|
181 default:
|
|
182 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
|
|
183 }
|
|
184 }
|
|
185 #endregion
|
|
186
|
|
187 #region handlers managment
|
|
188
|
|
189 protected void AddHandler(THandler handler) {
|
|
190
|
125
|
191 if (m_state > 1) {
|
119
|
192 // the promise is in the resolved state, just invoke the handler
|
125
|
193 InvokeHandler(handler);
|
|
194 } else {
|
|
195 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
|
|
196
|
|
197 if (slot < RESERVED_HANDLERS_COUNT) {
|
|
198 m_handlers[slot] = handler;
|
|
199
|
|
200 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
|
|
201 }
|
119
|
202
|
125
|
203 if (m_state > 1) {
|
|
204 do {
|
|
205 var hp = m_handlerPointer;
|
|
206 slot = hp + 1;
|
|
207 if (slot < m_handlersCommited) {
|
|
208 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
|
|
209 continue;
|
|
210 InvokeHandler(m_handlers[slot]);
|
|
211 }
|
|
212 break;
|
|
213 } while(true);
|
|
214 }
|
|
215 } else {
|
|
216 if (slot == RESERVED_HANDLERS_COUNT) {
|
|
217 m_extraHandlers = new MTQueue<THandler>();
|
|
218 } else {
|
|
219 while (m_extraHandlers == null)
|
|
220 Thread.MemoryBarrier();
|
|
221 }
|
119
|
222
|
125
|
223 m_extraHandlers.Enqueue(handler);
|
|
224
|
|
225 if (m_state > 1 && m_extraHandlers.TryDequeue(out handler))
|
119
|
226 // if the promise have been resolved while we was adding the handler to the queue
|
|
227 // we can't guarantee that someone is still processing it
|
|
228 // therefore we need to fetch a handler from the queue and execute it
|
|
229 // note that fetched handler may be not the one that we have added
|
|
230 // even we can fetch no handlers at all :)
|
|
231 InvokeHandler(handler);
|
125
|
232 }
|
119
|
233 }
|
|
234 }
|
|
235
|
|
236 protected void InvokeHandler(THandler handler) {
|
|
237 switch (m_state) {
|
|
238 case SUCCEEDED_STATE:
|
|
239 SignalSuccess(handler);
|
|
240 break;
|
|
241 case CANCELLED_STATE:
|
|
242 SignalCancelled(handler);
|
|
243 break;
|
|
244 case REJECTED_STATE:
|
|
245 SignalError(handler, m_error);
|
|
246 break;
|
|
247 default:
|
|
248 throw new Exception(String.Format("Invalid promise state {0}", m_state));
|
|
249 }
|
|
250 }
|
|
251
|
|
252 #endregion
|
|
253
|
|
254 #region IPromise implementation
|
|
255
|
|
256 public void Join(int timeout) {
|
|
257 WaitResult(timeout);
|
|
258 }
|
|
259
|
|
260 public void Join() {
|
|
261 WaitResult(-1);
|
|
262 }
|
|
263
|
|
264 public bool IsResolved {
|
|
265 get {
|
|
266 Thread.MemoryBarrier();
|
|
267 return m_state > 1;
|
|
268 }
|
|
269 }
|
|
270
|
|
271 public bool IsCancelled {
|
|
272 get {
|
|
273 Thread.MemoryBarrier();
|
|
274 return m_state == CANCELLED_STATE;
|
|
275 }
|
|
276 }
|
|
277
|
|
278 #endregion
|
|
279
|
|
280 #region ICancellable implementation
|
|
281
|
|
282 public void Cancel() {
|
|
283 SetCancelled();
|
|
284 }
|
|
285
|
|
286 #endregion
|
|
287 }
|
|
288 }
|
|
289
|