119
|
1 using System;
|
|
2 using Implab.Parallels;
|
|
3 using System.Threading;
|
|
4 using System.Reflection;
|
|
5
|
|
6 namespace Implab {
|
|
7 public abstract class AbstractPromise<THandler> {
|
|
8
|
|
9 const int UNRESOLVED_SATE = 0;
|
|
10 const int TRANSITIONAL_STATE = 1;
|
|
11 const int SUCCEEDED_STATE = 2;
|
|
12 const int REJECTED_STATE = 3;
|
|
13 const int CANCELLED_STATE = 4;
|
|
14
|
|
15 int m_state;
|
|
16 Exception m_error;
|
|
17
|
|
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
|
|
19
|
|
20 #region state managment
|
|
21 bool BeginTransit() {
|
|
22 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
|
|
23 }
|
|
24
|
|
25 void CompleteTransit(int state) {
|
|
26 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
|
|
27 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
|
|
28 }
|
|
29
|
|
30 void WaitTransition() {
|
|
31 while (m_state == TRANSITIONAL_STATE) {
|
|
32 Thread.MemoryBarrier();
|
|
33 }
|
|
34 }
|
|
35
|
|
36 protected void BeginSetResult() {
|
|
37 if (!BeginTransit()) {
|
|
38 WaitTransition();
|
|
39 if (m_state != CANCELLED_STATE)
|
|
40 throw new InvalidOperationException("The promise is already resolved");
|
|
41 }
|
|
42 }
|
|
43
|
|
44 protected void EndSetResult() {
|
|
45 CompleteTransit(SUCCEEDED_STATE);
|
|
46 OnSuccess();
|
|
47 }
|
|
48
|
|
49
|
|
50
|
|
51 /// <summary>
|
|
52 /// Выполняет обещание, сообщая об ошибке
|
|
53 /// </summary>
|
|
54 /// <remarks>
|
|
55 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
|
|
56 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
|
|
57 /// будут проигнорированы.
|
|
58 /// </remarks>
|
|
59 /// <param name="error">Исключение возникшее при выполнении операции</param>
|
|
60 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
|
|
61 protected void SetError(Exception error) {
|
|
62 if (BeginTransit()) {
|
|
63 m_error = error is PromiseTransientException ? error.InnerException : error;
|
|
64 CompleteTransit(REJECTED_STATE);
|
|
65 OnError();
|
|
66 } else {
|
|
67 WaitTransition();
|
|
68 if (m_state == SUCCEEDED_STATE)
|
|
69 throw new InvalidOperationException("The promise is already resolved");
|
|
70 }
|
|
71 }
|
|
72
|
|
73 /// <summary>
|
|
74 /// Отменяет операцию, если это возможно.
|
|
75 /// </summary>
|
|
76 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
|
|
77 protected void SetCancelled() {
|
|
78 if (BeginTransit()) {
|
|
79 CompleteTransit(CANCELLED_STATE);
|
|
80 OnCancelled();
|
|
81 }
|
|
82 }
|
|
83
|
|
84 protected abstract void SignalSuccess(THandler handler);
|
|
85
|
|
86 protected abstract void SignalError(THandler handler, Exception error);
|
|
87
|
|
88 protected abstract void SignalCancelled(THandler handler);
|
|
89
|
|
90 void OnSuccess() {
|
|
91 THandler handler;
|
|
92 while (m_handlers.TryDequeue(out handler))
|
|
93 SignalSuccess(handler);
|
|
94 }
|
|
95
|
|
96 void OnError() {
|
|
97 THandler handler;
|
|
98 while (m_handlers.TryDequeue(out handler))
|
|
99 SignalError(handler,m_error);
|
|
100 }
|
|
101
|
|
102 void OnCancelled() {
|
|
103 THandler handler;
|
|
104 while (m_handlers.TryDequeue(out handler))
|
|
105 SignalCancelled(handler);
|
|
106 }
|
|
107
|
|
108 #endregion
|
|
109
|
|
110 protected abstract void Listen(PromiseEventType events, Action handler);
|
|
111
|
|
112 #region synchronization traits
|
|
113 protected void WaitResult(int timeout) {
|
|
114 if (!IsResolved) {
|
|
115 var lk = new object();
|
|
116
|
|
117 Listen(PromiseEventType.All, () => {
|
|
118 lock(lk) {
|
|
119 Monitor.Pulse(lk);
|
|
120 }
|
|
121 });
|
|
122
|
|
123 lock (lk) {
|
|
124 while(!IsResolved) {
|
|
125 if(!Monitor.Wait(lk,timeout))
|
|
126 throw new TimeoutException();
|
|
127 }
|
|
128 }
|
|
129
|
|
130 }
|
|
131 switch (m_state) {
|
|
132 case SUCCEEDED_STATE:
|
|
133 return;
|
|
134 case CANCELLED_STATE:
|
|
135 throw new OperationCanceledException();
|
|
136 case REJECTED_STATE:
|
|
137 throw new TargetInvocationException(m_error);
|
|
138 default:
|
|
139 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
|
|
140 }
|
|
141 }
|
|
142 #endregion
|
|
143
|
|
144 #region handlers managment
|
|
145
|
|
146 protected void AddHandler(THandler handler) {
|
|
147
|
|
148 if (IsResolved) {
|
|
149 InvokeHandler(handler);
|
|
150
|
|
151 } else {
|
|
152 // the promise is in the resolved state, just invoke the handler
|
|
153 m_handlers.Enqueue(handler);
|
|
154
|
|
155
|
|
156 if (IsResolved && m_handlers.TryDequeue(out handler))
|
|
157 // if the promise have been resolved while we was adding the handler to the queue
|
|
158 // we can't guarantee that someone is still processing it
|
|
159 // therefore we need to fetch a handler from the queue and execute it
|
|
160 // note that fetched handler may be not the one that we have added
|
|
161 // even we can fetch no handlers at all :)
|
|
162 InvokeHandler(handler);
|
|
163 }
|
|
164 }
|
|
165
|
|
166 protected void InvokeHandler(THandler handler) {
|
|
167 switch (m_state) {
|
|
168 case SUCCEEDED_STATE:
|
|
169 SignalSuccess(handler);
|
|
170 break;
|
|
171 case CANCELLED_STATE:
|
|
172 SignalCancelled(handler);
|
|
173 break;
|
|
174 case REJECTED_STATE:
|
|
175 SignalError(handler, m_error);
|
|
176 break;
|
|
177 default:
|
|
178 throw new Exception(String.Format("Invalid promise state {0}", m_state));
|
|
179 }
|
|
180 }
|
|
181
|
|
182 #endregion
|
|
183
|
|
184 #region IPromise implementation
|
|
185
|
|
186 public void Join(int timeout) {
|
|
187 WaitResult(timeout);
|
|
188 }
|
|
189
|
|
190 public void Join() {
|
|
191 WaitResult(-1);
|
|
192 }
|
|
193
|
|
194 public bool IsResolved {
|
|
195 get {
|
|
196 Thread.MemoryBarrier();
|
|
197 return m_state > 1;
|
|
198 }
|
|
199 }
|
|
200
|
|
201 public bool IsCancelled {
|
|
202 get {
|
|
203 Thread.MemoryBarrier();
|
|
204 return m_state == CANCELLED_STATE;
|
|
205 }
|
|
206 }
|
|
207
|
|
208 #endregion
|
|
209
|
|
210 #region ICancellable implementation
|
|
211
|
|
212 public void Cancel() {
|
|
213 SetCancelled();
|
|
214 }
|
|
215
|
|
216 #endregion
|
|
217 }
|
|
218 }
|
|
219
|