comparison Implab/AbstractEvent.cs @ 156:97fbbf816844 v2

Promises: SignalXXX methods merged into SignalHandler method. Components: RunnableComponent In progress
author cin
date Mon, 15 Feb 2016 04:22:15 +0300
parents e6d4b41f0101
children 5802131432e4
comparison
equal deleted inserted replaced
155:037df317f126 156:97fbbf816844
6 namespace Implab { 6 namespace Implab {
7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable { 7 public abstract class AbstractEvent<THandler> : ICancellationToken, ICancellable {
8 8
9 const int UNRESOLVED_SATE = 0; 9 const int UNRESOLVED_SATE = 0;
10 const int TRANSITIONAL_STATE = 1; 10 const int TRANSITIONAL_STATE = 1;
11 const int SUCCEEDED_STATE = 2; 11 protected const int SUCCEEDED_STATE = 2;
12 const int REJECTED_STATE = 3; 12 protected const int REJECTED_STATE = 3;
13 const int CANCELLED_STATE = 4; 13 protected const int CANCELLED_STATE = 4;
14 14
15 const int CANCEL_NOT_REQUESTED = 0; 15 const int CANCEL_NOT_REQUESTED = 0;
16 const int CANCEL_REQUESTING = 1; 16 const int CANCEL_REQUESTING = 1;
17 const int CANCEL_REQUESTED = 2; 17 const int CANCEL_REQUESTED = 2;
18 18
20 20
21 int m_state; 21 int m_state;
22 Exception m_error; 22 Exception m_error;
23 int m_handlersCount; 23 int m_handlersCount;
24 24
25 readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT]; 25 //readonly THandler[] m_handlers = new THandler[RESERVED_HANDLERS_COUNT];
26 THandler[] m_handlers;
26 MTQueue<THandler> m_extraHandlers; 27 MTQueue<THandler> m_extraHandlers;
27 int m_handlerPointer = -1; 28 int m_handlerPointer = -1;
28 int m_handlersCommited; 29 int m_handlersCommited;
29 30
30 int m_cancelRequest; 31 int m_cancelRequest;
58 return true; 59 return true;
59 } 60 }
60 61
61 protected void EndSetResult() { 62 protected void EndSetResult() {
62 CompleteTransit(SUCCEEDED_STATE); 63 CompleteTransit(SUCCEEDED_STATE);
63 OnSuccess(); 64 Signal();
64 } 65 }
65 66
66 67
67 68
68 /// <summary> 69 /// <summary>
76 /// <param name="error">Исключение возникшее при выполнении операции</param> 77 /// <param name="error">Исключение возникшее при выполнении операции</param>
77 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> 78 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception>
78 protected void SetError(Exception error) { 79 protected void SetError(Exception error) {
79 if (BeginTransit()) { 80 if (BeginTransit()) {
80 if (error is OperationCanceledException) { 81 if (error is OperationCanceledException) {
82 m_error = error.InnerException;
81 CompleteTransit(CANCELLED_STATE); 83 CompleteTransit(CANCELLED_STATE);
82 m_error = error.InnerException;
83 OnCancelled();
84 } else { 84 } else {
85 m_error = error is PromiseTransientException ? error.InnerException : error; 85 m_error = error is PromiseTransientException ? error.InnerException : error;
86 CompleteTransit(REJECTED_STATE); 86 CompleteTransit(REJECTED_STATE);
87 OnError();
88 } 87 }
88 Signal();
89 } else { 89 } else {
90 WaitTransition(); 90 WaitTransition();
91 if (m_state == SUCCEEDED_STATE) 91 if (m_state == SUCCEEDED_STATE)
92 throw new InvalidOperationException("The promise is already resolved"); 92 throw new InvalidOperationException("The promise is already resolved");
93 } 93 }
99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks> 99 /// <remarks>Для определения была ли операция отменена следует использовать свойство <see cref="IsCancelled"/>.</remarks>
100 protected void SetCancelled(Exception reason) { 100 protected void SetCancelled(Exception reason) {
101 if (BeginTransit()) { 101 if (BeginTransit()) {
102 m_error = reason; 102 m_error = reason;
103 CompleteTransit(CANCELLED_STATE); 103 CompleteTransit(CANCELLED_STATE);
104 OnCancelled(); 104 Signal();
105 } 105 }
106 } 106 }
107 107
108 protected abstract void SignalSuccess(THandler handler); 108 protected abstract void SignalHandler(THandler handler, int signal);
109 109
110 protected abstract void SignalError(THandler handler, Exception error); 110 void Signal() {
111
112 protected abstract void SignalCancelled(THandler handler, Exception reason);
113
114 void OnSuccess() {
115 var hp = m_handlerPointer; 111 var hp = m_handlerPointer;
116 var slot = hp +1 ; 112 var slot = hp +1 ;
117 while (slot < m_handlersCommited) { 113 while (slot < m_handlersCommited) {
118 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) { 114 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
119 SignalSuccess(m_handlers[slot]); 115 SignalHandler(m_handlers[slot], m_state);
120 } 116 }
121 hp = m_handlerPointer; 117 hp = m_handlerPointer;
122 slot = hp +1 ; 118 slot = hp +1 ;
123 } 119 }
124 120
125 121
126 if (m_extraHandlers != null) { 122 if (m_extraHandlers != null) {
127 THandler handler; 123 THandler handler;
128 while (m_extraHandlers.TryDequeue(out handler)) 124 while (m_extraHandlers.TryDequeue(out handler))
129 SignalSuccess(handler); 125 SignalHandler(handler, m_state);
130 }
131 }
132
133 void OnError() {
134 var hp = m_handlerPointer;
135 var slot = hp +1 ;
136 while (slot < m_handlersCommited) {
137 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
138 SignalError(m_handlers[slot],m_error);
139 }
140 hp = m_handlerPointer;
141 slot = hp +1 ;
142 }
143
144 if (m_extraHandlers != null) {
145 THandler handler;
146 while (m_extraHandlers.TryDequeue(out handler))
147 SignalError(handler, m_error);
148 }
149 }
150
151 void OnCancelled() {
152 var hp = m_handlerPointer;
153 var slot = hp +1 ;
154 while (slot < m_handlersCommited) {
155 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) == hp) {
156 SignalCancelled(m_handlers[slot], m_error);
157 }
158 hp = m_handlerPointer;
159 slot = hp +1 ;
160 }
161
162 if (m_extraHandlers != null) {
163 THandler handler;
164 while (m_extraHandlers.TryDequeue(out handler))
165 SignalCancelled(handler, m_error);
166 } 126 }
167 } 127 }
168 128
169 #endregion 129 #endregion
170 130
192 152
193 protected void AddHandler(THandler handler) { 153 protected void AddHandler(THandler handler) {
194 154
195 if (m_state > 1) { 155 if (m_state > 1) {
196 // the promise is in the resolved state, just invoke the handler 156 // the promise is in the resolved state, just invoke the handler
197 InvokeHandler(handler); 157 SignalHandler(handler, m_state);
198 } else { 158 } else {
199 var slot = Interlocked.Increment(ref m_handlersCount) - 1; 159 var slot = Interlocked.Increment(ref m_handlersCount) - 1;
200 160
201 if (slot < RESERVED_HANDLERS_COUNT) { 161 if (slot < RESERVED_HANDLERS_COUNT) {
162
163 if (slot == 0)
164 Interlocked.CompareExchange(ref m_handlers, new THandler[RESERVED_HANDLERS_COUNT], null);
202 165
203 m_handlers[slot] = handler; 166 m_handlers[slot] = handler;
204 167
205 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) { 168 while (slot != Interlocked.CompareExchange(ref m_handlersCommited, slot + 1, slot)) {
206 } 169 }
210 var hp = m_handlerPointer; 173 var hp = m_handlerPointer;
211 slot = hp + 1; 174 slot = hp + 1;
212 if (slot < m_handlersCommited) { 175 if (slot < m_handlersCommited) {
213 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp) 176 if (Interlocked.CompareExchange(ref m_handlerPointer, slot, hp) != hp)
214 continue; 177 continue;
215 InvokeHandler(m_handlers[slot]); 178 SignalHandler(m_handlers[slot], m_state);
216 } 179 }
217 break; 180 break;
218 } while(true); 181 } while(true);
219 } 182 }
220 } else { 183 } else {
231 // if the promise have been resolved while we was adding the handler to the queue 194 // if the promise have been resolved while we was adding the handler to the queue
232 // we can't guarantee that someone is still processing it 195 // we can't guarantee that someone is still processing it
233 // therefore we need to fetch a handler from the queue and execute it 196 // therefore we need to fetch a handler from the queue and execute it
234 // note that fetched handler may be not the one that we have added 197 // note that fetched handler may be not the one that we have added
235 // even we can fetch no handlers at all :) 198 // even we can fetch no handlers at all :)
236 InvokeHandler(handler); 199 SignalHandler(handler, m_state);
237 } 200 }
238 }
239 }
240
241 protected void InvokeHandler(THandler handler) {
242 switch (m_state) {
243 case SUCCEEDED_STATE:
244 SignalSuccess(handler);
245 break;
246 case CANCELLED_STATE:
247 SignalCancelled(handler, m_error);
248 break;
249 case REJECTED_STATE:
250 SignalError(handler, m_error);
251 break;
252 default:
253 throw new Exception(String.Format("Invalid promise state {0}", m_state));
254 } 201 }
255 } 202 }
256 203
257 #endregion 204 #endregion
258 205