Mercurial > pub > ImplabNet
comparison Implab/AbstractPromise.cs @ 119:2573b562e328 v2
Promises rewritten, added improved version of AsyncQueue
author | cin |
---|---|
date | Sun, 11 Jan 2015 19:13:02 +0300 |
parents | |
children | f803565868a4 |
comparison
equal
deleted
inserted
replaced
118:e046a94eecb1 | 119:2573b562e328 |
---|---|
1 using System; | |
2 using Implab.Parallels; | |
3 using System.Threading; | |
4 using System.Reflection; | |
5 | |
6 namespace Implab { | |
7 public abstract class AbstractPromise<THandler> { | |
8 | |
9 const int UNRESOLVED_SATE = 0; | |
10 const int TRANSITIONAL_STATE = 1; | |
11 const int SUCCEEDED_STATE = 2; | |
12 const int REJECTED_STATE = 3; | |
13 const int CANCELLED_STATE = 4; | |
14 | |
15 int m_state; | |
16 Exception m_error; | |
17 | |
18 readonly AsyncQueue<THandler> m_handlers = new AsyncQueue<THandler>(); | |
19 | |
20 #region state managment | |
21 bool BeginTransit() { | |
22 return UNRESOLVED_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, UNRESOLVED_SATE); | |
23 } | |
24 | |
25 void CompleteTransit(int state) { | |
26 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | |
27 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
28 } | |
29 | |
30 void WaitTransition() { | |
31 while (m_state == TRANSITIONAL_STATE) { | |
32 Thread.MemoryBarrier(); | |
33 } | |
34 } | |
35 | |
36 protected void BeginSetResult() { | |
37 if (!BeginTransit()) { | |
38 WaitTransition(); | |
39 if (m_state != CANCELLED_STATE) | |
40 throw new InvalidOperationException("The promise is already resolved"); | |
41 } | |
42 } | |
43 | |
44 protected void EndSetResult() { | |
45 CompleteTransit(SUCCEEDED_STATE); | |
46 OnSuccess(); | |
47 } | |
48 | |
49 | |
50 | |
51 /// <summary> | |
52 /// Выполняет обещание, сообщая об ошибке | |
53 /// </summary> | |
54 /// <remarks> | |
55 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
56 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
57 /// будут проигнорированы. | |
58 /// </remarks> | |
59 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
60 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
61 protected void SetError(Exception error) { | |
62 if (BeginTransit()) { | |
63 m_error = error is PromiseTransientException ? error.InnerException : error; | |
64 CompleteTransit(REJECTED_STATE); | |
65 OnError(); | |
66 } else { | |
67 WaitTransition(); | |
68 if (m_state == SUCCEEDED_STATE) | |
69 throw new InvalidOperationException("The promise is already resolved"); | |
70 } | |
71 } | |
72 | |
73 /// <summary> | |
74 /// Отменяет операцию, если это возможно. | |
75 /// </summary> | |
76 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | |
77 protected void SetCancelled() { | |
78 if (BeginTransit()) { | |
79 CompleteTransit(CANCELLED_STATE); | |
80 OnCancelled(); | |
81 } | |
82 } | |
83 | |
84 protected abstract void SignalSuccess(THandler handler); | |
85 | |
86 protected abstract void SignalError(THandler handler, Exception error); | |
87 | |
88 protected abstract void SignalCancelled(THandler handler); | |
89 | |
90 void OnSuccess() { | |
91 THandler handler; | |
92 while (m_handlers.TryDequeue(out handler)) | |
93 SignalSuccess(handler); | |
94 } | |
95 | |
96 void OnError() { | |
97 THandler handler; | |
98 while (m_handlers.TryDequeue(out handler)) | |
99 SignalError(handler,m_error); | |
100 } | |
101 | |
102 void OnCancelled() { | |
103 THandler handler; | |
104 while (m_handlers.TryDequeue(out handler)) | |
105 SignalCancelled(handler); | |
106 } | |
107 | |
108 #endregion | |
109 | |
110 protected abstract void Listen(PromiseEventType events, Action handler); | |
111 | |
112 #region synchronization traits | |
113 protected void WaitResult(int timeout) { | |
114 if (!IsResolved) { | |
115 var lk = new object(); | |
116 | |
117 Listen(PromiseEventType.All, () => { | |
118 lock(lk) { | |
119 Monitor.Pulse(lk); | |
120 } | |
121 }); | |
122 | |
123 lock (lk) { | |
124 while(!IsResolved) { | |
125 if(!Monitor.Wait(lk,timeout)) | |
126 throw new TimeoutException(); | |
127 } | |
128 } | |
129 | |
130 } | |
131 switch (m_state) { | |
132 case SUCCEEDED_STATE: | |
133 return; | |
134 case CANCELLED_STATE: | |
135 throw new OperationCanceledException(); | |
136 case REJECTED_STATE: | |
137 throw new TargetInvocationException(m_error); | |
138 default: | |
139 throw new ApplicationException(String.Format("Invalid promise state {0}", m_state)); | |
140 } | |
141 } | |
142 #endregion | |
143 | |
144 #region handlers managment | |
145 | |
146 protected void AddHandler(THandler handler) { | |
147 | |
148 if (IsResolved) { | |
149 InvokeHandler(handler); | |
150 | |
151 } else { | |
152 // the promise is in the resolved state, just invoke the handler | |
153 m_handlers.Enqueue(handler); | |
154 | |
155 | |
156 if (IsResolved && m_handlers.TryDequeue(out handler)) | |
157 // if the promise have been resolved while we was adding the handler to the queue | |
158 // we can't guarantee that someone is still processing it | |
159 // therefore we need to fetch a handler from the queue and execute it | |
160 // note that fetched handler may be not the one that we have added | |
161 // even we can fetch no handlers at all :) | |
162 InvokeHandler(handler); | |
163 } | |
164 } | |
165 | |
166 protected void InvokeHandler(THandler handler) { | |
167 switch (m_state) { | |
168 case SUCCEEDED_STATE: | |
169 SignalSuccess(handler); | |
170 break; | |
171 case CANCELLED_STATE: | |
172 SignalCancelled(handler); | |
173 break; | |
174 case REJECTED_STATE: | |
175 SignalError(handler, m_error); | |
176 break; | |
177 default: | |
178 throw new Exception(String.Format("Invalid promise state {0}", m_state)); | |
179 } | |
180 } | |
181 | |
182 #endregion | |
183 | |
184 #region IPromise implementation | |
185 | |
186 public void Join(int timeout) { | |
187 WaitResult(timeout); | |
188 } | |
189 | |
190 public void Join() { | |
191 WaitResult(-1); | |
192 } | |
193 | |
194 public bool IsResolved { | |
195 get { | |
196 Thread.MemoryBarrier(); | |
197 return m_state > 1; | |
198 } | |
199 } | |
200 | |
201 public bool IsCancelled { | |
202 get { | |
203 Thread.MemoryBarrier(); | |
204 return m_state == CANCELLED_STATE; | |
205 } | |
206 } | |
207 | |
208 #endregion | |
209 | |
210 #region ICancellable implementation | |
211 | |
212 public void Cancel() { | |
213 SetCancelled(); | |
214 } | |
215 | |
216 #endregion | |
217 } | |
218 } | |
219 |