Mercurial > pub > ImplabNet
comparison Implab/AbstractEvent.cs @ 156:97fbbf816844 v2
Promises: SignalXXX methods merged into SignalHandler method.
Components: RunnableComponent In progress
author | cin |
---|---|
date | Mon, 15 Feb 2016 04:22:15 +0300 |
parents | e6d4b41f0101 |
children | 5802131432e4 |
comparison
equal
deleted
inserted
replaced
155:037df317f126 | 156:97fbbf816844 |
---|---|
6 namespace Implab { | 6 namespace Implab { |
7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { | 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { |
8 | 8 |
9 const int UNRESOLVED_SATE = 0; | 9 const int UNRESOLVED_SATE = 0; |
10 const int TRANSITIONAL_STATE = 1; | 10 const int TRANSITIONAL_STATE = 1; |
11 const int SUCCEEDED_STATE = 2; | 11 protected const int SUCCEEDED_STATE = 2; |
12 const int REJECTED_STATE = 3; | 12 protected const int REJECTED_STATE = 3; |
13 const int CANCELLED_STATE = 4; | 13 protected const int CANCELLED_STATE = 4; |
14 | 14 |
15 const int CANCEL_NOT_REQUESTED = 0; | 15 const int CANCEL_NOT_REQUESTED = 0; |
16 const int CANCEL_REQUESTING = 1; | 16 const int CANCEL_REQUESTING = 1; |
17 const int CANCEL_REQUESTED = 2; | 17 const int CANCEL_REQUESTED = 2; |
18 | 18 |
20 | 20 |
21 int m_state; | 21 int m_state; |
22 Exception m_error; | 22 Exception m_error; |
23 int m_handlersCount; | 23 int m_handlersCount; |
24 | 24 |
25 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; | 25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; |
26 THandler[] m_handlers; | |
26 MTQueue<THandler> m_extraHandlers; | 27 MTQueue<THandler> m_extraHandlers; |
27 int m_handlerPointer = -1; | 28 int m_handlerPointer = -1; |
28 int m_handlersCommited; | 29 int m_handlersCommited; |
29 | 30 |
30 int m_cancelRequest; | 31 int m_cancelRequest; |
58 return true; | 59 return true; |
59 } | 60 } |
60 | 61 |
61 protected void EndSetResult() { | 62 protected void EndSetResult() { |
62 CompleteTransit(SUCCEEDED_STATE); | 63 CompleteTransit(SUCCEEDED_STATE); |
63 OnSuccess(); | 64 Signal(); |
64 } | 65 } |
65 | 66 |
66 | 67 |
67 | 68 |
68 /// <summary> | 69 /// <summary> |
76 /// <param name="error">Исключение возникшее при выполнении операции</param> | 77 /// <param name="error">Исключение возникшее при выполнении операции</param> |
77 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | 78 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> |
78 protected void SetError(Exception error) { | 79 protected void SetError(Exception error) { |
79 if (BeginTransit()) { | 80 if (BeginTransit()) { |
80 if (error is OperationCanceledException) { | 81 if (error is OperationCanceledException) { |
82 m_error = error.InnerException; | |
81 CompleteTransit(CANCELLED_STATE); | 83 CompleteTransit(CANCELLED_STATE); |
82 m_error = error.InnerException; | |
83 OnCancelled(); | |
84 } else { | 84 } else { |
85 m_error = error is PromiseTransientException ? error.InnerException : error; | 85 m_error = error is PromiseTransientException ? error.InnerException : error; |
86 CompleteTransit(REJECTED_STATE); | 86 CompleteTransit(REJECTED_STATE); |
87 OnError(); | |
88 } | 87 } |
88 Signal(); | |
89 } else { | 89 } else { |
90 WaitTransition(); | 90 WaitTransition(); |
91 if (m_state == SUCCEEDED_STATE) | 91 if (m_state == SUCCEEDED_STATE) |
92 throw new InvalidOperationException("The promise is already resolved"); | 92 throw new InvalidOperationException("The promise is already resolved"); |
93 } | 93 } |
99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> | 99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> |
100 protected void SetCancelled(Exception reason) { | 100 protected void SetCancelled(Exception reason) { |
101 if (BeginTransit()) { | 101 if (BeginTransit()) { |
102 m_error = reason; | 102 m_error = reason; |
103 CompleteTransit(CANCELLED_STATE); | 103 CompleteTransit(CANCELLED_STATE); |
104 OnCancelled(); | 104 Signal(); |
105 } | 105 } |
106 } | 106 } |
107 | 107 |
108 protected abstract void SignalSuccess(THandler handler); | 108 protected abstract void SignalHandler(THandler handler, int signal); |
109 | 109 |
110 protected abstract void SignalError(THandler handler, Exception error); | 110 void Signal() { |
111 | |
112 protected abstract void SignalCancelled(THandler handler, Exception reason); | |
113 | |
114 void OnSuccess() { | |
115 var hp = m_handlerPointer; | 111 var hp = m_handlerPointer; |
116 var slot = hp +1 ; | 112 var slot = hp +1 ; |
117 while (slot < m_handlersCommited) { | 113 while (slot < m_handlersCommited) { |
118 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | 114 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { |
119 SignalSuccess(m_handlers[slot]); | 115 SignalHandler(m_handlers[slot], m_state); |
120 } | 116 } |
121 hp = m_handlerPointer; | 117 hp = m_handlerPointer; |
122 slot = hp +1 ; | 118 slot = hp +1 ; |
123 } | 119 } |
124 | 120 |
125 | 121 |
126 if (m_extraHandlers != null) { | 122 if (m_extraHandlers != null) { |
127 THandler handler; | 123 THandler handler; |
128 while (m_extraHandlers.TryDequeue(out handler)) | 124 while (m_extraHandlers.TryDequeue(out handler)) |
129 SignalSuccess(handler); | 125 SignalHandler(handler, m_state); |
130 } | |
131 } | |
132 | |
133 void OnError() { | |
134 var hp = m_handlerPointer; | |
135 var slot = hp +1 ; | |
136 while (slot < m_handlersCommited) { | |
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
138 SignalError(m_handlers[slot],m_error); | |
139 } | |
140 hp = m_handlerPointer; | |
141 slot = hp +1 ; | |
142 } | |
143 | |
144 if (m_extraHandlers != null) { | |
145 THandler handler; | |
146 while (m_extraHandlers.TryDequeue(out handler)) | |
147 SignalError(handler, m_error); | |
148 } | |
149 } | |
150 | |
151 void OnCancelled() { | |
152 var hp = m_handlerPointer; | |
153 var slot = hp +1 ; | |
154 while (slot < m_handlersCommited) { | |
155 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { | |
156 SignalCancelled(m_handlers[slot], m_error); | |
157 } | |
158 hp = m_handlerPointer; | |
159 slot = hp +1 ; | |
160 } | |
161 | |
162 if (m_extraHandlers != null) { | |
163 THandler handler; | |
164 while (m_extraHandlers.TryDequeue(out handler)) | |
165 SignalCancelled(handler, m_error); | |
166 } | 126 } |
167 } | 127 } |
168 | 128 |
169 #endregion | 129 #endregion |
170 | 130 |
192 | 152 |
193 protected void AddHandler(THandler handler) { | 153 protected void AddHandler(THandler handler) { |
194 | 154 |
195 if (m_state > 1) { | 155 if (m_state > 1) { |
196 // the promise is in the resolved state, just invoke the handler | 156 // the promise is in the resolved state, just invoke the handler |
197 InvokeHandler(handler); | 157 SignalHandler(handler, m_state); |
198 } else { | 158 } else { |
199 var slot = Interlocked.Increment(ref m_handlersCount) - 1; | 159 var slot = Interlocked.Increment(ref m_handlersCount) - 1; |
200 | 160 |
201 if (slot < RESERVED_HANDLERS_COUNT) { | 161 if (slot < RESERVED_HANDLERS_COUNT) { |
162 | |
163 if (slot == 0) | |
164 Interlocked.CompareExchange(ref m_handlers, new THandler[RESERVED_HANDLERS_COUNT], null); | |
202 | 165 |
203 m_handlers[slot] = handler; | 166 m_handlers[slot] = handler; |
204 | 167 |
205 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { | 168 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { |
206 } | 169 } |
210 var hp = m_handlerPointer; | 173 var hp = m_handlerPointer; |
211 slot = hp + 1; | 174 slot = hp + 1; |
212 if (slot < m_handlersCommited) { | 175 if (slot < m_handlersCommited) { |
213 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) | 176 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) |
214 continue; | 177 continue; |
215 InvokeHandler(m_handlers[slot]); | 178 SignalHandler(m_handlers[slot], m_state); |
216 } | 179 } |
217 break; | 180 break; |
218 } while(true); | 181 } while(true); |
219 } | 182 } |
220 } else { | 183 } else { |
231 // if the promise have been resolved while we was adding the handler to the queue | 194 // if the promise have been resolved while we was adding the handler to the queue |
232 // we can't guarantee that someone is still processing it | 195 // we can't guarantee that someone is still processing it |
233 // therefore we need to fetch a handler from the queue and execute it | 196 // therefore we need to fetch a handler from the queue and execute it |
234 // note that fetched handler may be not the one that we have added | 197 // note that fetched handler may be not the one that we have added |
235 // even we can fetch no handlers at all :) | 198 // even we can fetch no handlers at all :) |
236 InvokeHandler(handler); | 199 SignalHandler(handler, m_state); |
237 } | 200 } |
238 } | |
239 } | |
240 | |
241 protected void InvokeHandler(THandler handler) { | |
242 switch (m_state) { | |
243 case SUCCEEDED_STATE: | |
244 SignalSuccess(handler); | |
245 break; | |
246 case CANCELLED_STATE: | |
247 SignalCancelled(handler, m_error); | |
248 break; | |
249 case REJECTED_STATE: | |
250 SignalError(handler, m_error); | |
251 break; | |
252 default: | |
253 throw new Exception(String.Format("Invalid promise state {0}", m_state)); | |
254 } | 201 } |
255 } | 202 } |
256 | 203 |
257 #endregion | 204 #endregion |
258 | 205 |