comparison Implab/AbstractPromise.cs @ 119:2573b562e328 v2

Promises rewritten, added improved version of AsyncQueue
author cin
date Sun, 11 Jan 2015 19:13:02 +0300
parents
children f803565868a4
comparison
equal deleted inserted replaced
118:e046a94eecb1 119:2573b562e328
1 using System;
2 using Implab.Parallels;
3 using System.Threading;
4 using System.Reflection;
5
6 namespace Implab {
7 public abstract class AbstractPromise<THandler> {
8
9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4;
14
15 int m_state;
16 Exception m_error;
17
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>();
19
20 #region state managment
21 bool BeginTransit() {
22 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE);
23 }
24
25 void CompleteTransit(int state) {
26 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE))
27 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
28 }
29
30 void WaitTransition() {
31 while (m_state == TRANSITIONAL_STATE) {
32 Thread.MemoryBarrier();
33 }
34 }
35
36 protected void BeginSetResult() {
37 if (!BeginTransit()) {
38 WaitTransition();
39 if (m_state != CANCELLED_STATE)
40 throw new InvalidOperationException("The promise is already resolved");
41 }
42 }
43
44 protected void EndSetResult() {
45 CompleteTransit(SUCCEEDED_STATE);
46 OnSuccess();
47 }
48
49
50
51 /// <summary>
52 /// Выполняет обещание, сообщая об ошибке
53 /// </summary>
54 /// <remarks>
55 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков
56 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные
57 /// будут проигнорированы.
58 /// </remarks>
59 /// <param name="error">Исключение возникшее при выполнении операции</param>
60 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
61 protected void SetError(Exception error) {
62 if (BeginTransit()) {
63 m_error = error is PromiseTransientException ? error.InnerException : error;
64 CompleteTransit(REJECTED_STATE);
65 OnError();
66 } else {
67 WaitTransition();
68 if (m_state == SUCCEEDED_STATE)
69 throw new InvalidOperationException("The promise is already resolved");
70 }
71 }
72
73 /// <summary>
74 /// Отменяет операцию, если это возможно.
75 /// </summary>
76 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
77 protected void SetCancelled() {
78 if (BeginTransit()) {
79 CompleteTransit(CANCELLED_STATE);
80 OnCancelled();
81 }
82 }
83
84 protected abstract void SignalSuccess(THandler handler);
85
86 protected abstract void SignalError(THandler handler, Exception error);
87
88 protected abstract void SignalCancelled(THandler handler);
89
90 void OnSuccess() {
91 THandler handler;
92 while (m_handlers.TryDequeue(out handler))
93 SignalSuccess(handler);
94 }
95
96 void OnError() {
97 THandler handler;
98 while (m_handlers.TryDequeue(out handler))
99 SignalError(handler,m_error);
100 }
101
102 void OnCancelled() {
103 THandler handler;
104 while (m_handlers.TryDequeue(out handler))
105 SignalCancelled(handler);
106 }
107
108 #endregion
109
110 protected abstract void Listen(PromiseEventType events, Action handler);
111
112 #region synchronization traits
113 protected void WaitResult(int timeout) {
114 if (!IsResolved) {
115 var lk = new object();
116
117 Listen(PromiseEventType.All, () => {
118 lock(lk) {
119 Monitor.Pulse(lk);
120 }
121 });
122
123 lock (lk) {
124 while(!IsResolved) {
125 if(!Monitor.Wait(lk,timeout))
126 throw new TimeoutException();
127 }
128 }
129
130 }
131 switch (m_state) {
132 case SUCCEEDED_STATE:
133 return;
134 case CANCELLED_STATE:
135 throw new OperationCanceledException();
136 case REJECTED_STATE:
137 throw new TargetInvocationException(m_error);
138 default:
139 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state));
140 }
141 }
142 #endregion
143
144 #region handlers managment
145
146 protected void AddHandler(THandler handler) {
147
148 if (IsResolved) {
149 InvokeHandler(handler);
150
151 } else {
152 // the promise is in the resolved state, just invoke the handler
153 m_handlers.Enqueue(handler);
154
155
156 if (IsResolved && m_handlers.TryDequeue(out handler))
157 // if the promise have been resolved while we was adding the handler to the queue
158 // we can't guarantee that someone is still processing it
159 // therefore we need to fetch a handler from the queue and execute it
160 // note that fetched handler may be not the one that we have added
161 // even we can fetch no handlers at all :)
162 InvokeHandler(handler);
163 }
164 }
165
166 protected void InvokeHandler(THandler handler) {
167 switch (m_state) {
168 case SUCCEEDED_STATE:
169 SignalSuccess(handler);
170 break;
171 case CANCELLED_STATE:
172 SignalCancelled(handler);
173 break;
174 case REJECTED_STATE:
175 SignalError(handler, m_error);
176 break;
177 default:
178 throw new Exception(String.Format("Invalid promise state {0}", m_state));
179 }
180 }
181
182 #endregion
183
184 #region IPromise implementation
185
186 public void Join(int timeout) {
187 WaitResult(timeout);
188 }
189
190 public void Join() {
191 WaitResult(-1);
192 }
193
194 public bool IsResolved {
195 get {
196 Thread.MemoryBarrier();
197 return m_state > 1;
198 }
199 }
200
201 public bool IsCancelled {
202 get {
203 Thread.MemoryBarrier();
204 return m_state == CANCELLED_STATE;
205 }
206 }
207
208 #endregion
209
210 #region ICancellable implementation
211
212 public void Cancel() {
213 SetCancelled();
214 }
215
216 #endregion
217 }
218 }
219