250
|
1 using System;
|
251
|
2 using System.Diagnostics;
|
250
|
3 using System.Threading;
|
|
4 using System.Threading.Tasks;
|
|
5
|
251
|
6 namespace Implab.Components {
|
|
7 /// <summary>
|
|
8 /// Base class for implementing components which support start and stop operations,
|
|
9 /// such components may represent running services.
|
|
10 /// </summary>
|
|
11 /// <remarks>
|
|
12 /// This class provides a basic lifecycle from the creation to the
|
|
13 /// termination of the component.
|
|
14 /// </remarks>
|
257
|
15 public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
|
251
|
16
|
|
17 /// <summary>
|
252
|
18 /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
|
251
|
19 /// when the task completes the associated token source will be disposed.
|
|
20 /// </summary>
|
|
21 class AsyncOperationDescriptor {
|
|
22
|
|
23 public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
|
|
24
|
|
25 readonly CancellationTokenSource m_cts;
|
|
26
|
|
27 bool m_done;
|
|
28
|
|
29 public CancellationToken Token {
|
|
30 get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
|
|
31 }
|
|
32
|
|
33 public Task Task { get; private set; }
|
|
34
|
|
35 private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
|
|
36 m_cts = cts;
|
|
37 Task = Chain(task);
|
|
38 }
|
|
39
|
|
40 private AsyncOperationDescriptor() {
|
|
41 Task = Task.CompletedTask;
|
|
42 }
|
250
|
43
|
251
|
44 public void Cancel() {
|
|
45 if (m_cts != null) {
|
|
46 lock (m_cts) {
|
|
47 if (!m_done)
|
|
48 m_cts.Cancel();
|
|
49 }
|
|
50 }
|
|
51 }
|
|
52
|
|
53 void Done() {
|
|
54 if (m_cts != null) {
|
|
55 lock (m_cts) {
|
|
56 m_done = true;
|
|
57 m_cts.Dispose();
|
|
58 }
|
|
59 } else {
|
|
60 m_done = true;
|
|
61 }
|
|
62 }
|
|
63
|
|
64 async Task Chain(Task other) {
|
|
65 try {
|
|
66 await other;
|
|
67 } finally {
|
|
68 Done();
|
|
69 }
|
|
70 }
|
|
71
|
|
72 public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
|
|
73 var cts = ct.CanBeCanceled ?
|
|
74 CancellationTokenSource.CreateLinkedTokenSource(ct) :
|
|
75 new CancellationTokenSource();
|
|
76
|
|
77 return new AsyncOperationDescriptor(factory(cts.Token), cts);
|
|
78 }
|
|
79
|
|
80 }
|
|
81
|
|
82 // this lock is used to synchronize state flow of the component during
|
256
|
83 // processing calls from a client and internal processes.
|
250
|
84 readonly object m_lock = new object();
|
|
85
|
251
|
86 // current operation cookie, used to check wheather a call to
|
|
87 // MoveSuccess/MoveFailed method belongs to the current
|
|
88 // operation, if cookies didn't match ignore completion result.
|
|
89 object m_cookie;
|
250
|
90
|
256
|
91 // AsyncOperationDscriptor aggregates a task and it's cancellation token
|
251
|
92 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
|
|
93
|
|
94 ExecutionState m_state;
|
|
95
|
257
|
96 /// <summary>
|
|
97 /// Объект синхронизации используется для обеспечения совместного доступа
|
|
98 /// клиента компоненты и процессов, протекающих внутри компоненты, к общему
|
|
99 /// состоянию, т.е.true таким свойствам, как <see cref="State"/>,
|
|
100 /// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/>
|
|
101 /// вызываются уже с установленной блокировкой, поэтому дополнительная
|
|
102 /// синхронизация не требуется.
|
|
103 /// </summary>
|
|
104 public object SynchronizationObject { get { return m_lock; } }
|
251
|
105
|
|
106 protected RunnableComponent(bool initialized) {
|
|
107 State = initialized ? ExecutionState.Ready : ExecutionState.Created;
|
250
|
108 }
|
|
109
|
251
|
110 public Task Completion {
|
|
111 get { return m_current.Task; }
|
|
112 }
|
250
|
113
|
251
|
114 public ExecutionState State {
|
|
115 get { return m_state; }
|
|
116 private set {
|
|
117 if (m_state != value) {
|
|
118 m_state = value;
|
|
119 StateChanged.DispatchEvent(this, new StateChangeEventArgs {
|
|
120 State = value,
|
|
121 LastError = LastError
|
|
122 });
|
|
123 }
|
|
124 }
|
|
125 }
|
|
126
|
|
127 public Exception LastError { get; private set; }
|
250
|
128
|
257
|
129 /// <summary>
|
|
130 /// Событие изменения состояния компоненты.see Обработчики данного события
|
|
131 /// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны
|
|
132 /// выполняться максимально быстро.
|
|
133 /// </summary>
|
250
|
134 public event EventHandler<StateChangeEventArgs> StateChanged;
|
|
135
|
251
|
136 /// <summary>
|
|
137 /// Releases all resources used by the current component regardless of its
|
|
138 /// execution state.
|
|
139 /// </summary>
|
|
140 /// <remarks>
|
|
141 /// Calling to this method may result unexpedted results if the component
|
|
142 /// isn't in the stopped state. Call this method after the component is
|
|
143 /// stopped if needed or if the component is in the failed state.
|
|
144 /// </remarks>
|
250
|
145 public void Dispose() {
|
251
|
146 bool dispose = false;
|
257
|
147 lock (SynchronizationObject) {
|
|
148 if (m_state != ExecutionState.Disposed) {
|
|
149 dispose = true;
|
|
150 m_state = ExecutionState.Disposed;
|
|
151 m_cookie = new object();
|
|
152 }
|
|
153 }
|
251
|
154 if (dispose) {
|
250
|
155 Dispose(true);
|
|
156 GC.SuppressFinalize(this);
|
|
157 }
|
|
158 }
|
|
159
|
251
|
160 ~RunnableComponent() {
|
|
161 Dispose(false);
|
|
162 }
|
|
163
|
|
164 /// <summary>
|
|
165 /// Releases all resources used by the current component regardless of its
|
|
166 /// execution state.
|
|
167 /// </summary>
|
|
168 /// <param name="disposing">Indicates that the component is disposed
|
|
169 /// during a normal disposing or during GC.</param>
|
250
|
170 protected virtual void Dispose(bool disposing) {
|
251
|
171 }
|
|
172
|
|
173 public void Initialize() {
|
262
|
174 Initialize(CancellationToken.None);
|
|
175 }
|
|
176
|
|
177 public void Initialize(CancellationToken ct) {
|
251
|
178 var cookie = new object();
|
|
179 if (MoveInitialize(cookie))
|
262
|
180 Safe.NoWait(ScheduleTask(InitializeInternalAsync, ct, cookie));
|
256
|
181 else
|
|
182 throw new InvalidOperationException();
|
251
|
183 }
|
|
184
|
|
185 /// <summary>
|
|
186 /// This method is used for initialization during a component creation.
|
|
187 /// </summary>
|
|
188 /// <param name="ct">A cancellation token for this operation</param>
|
|
189 /// <remarks>
|
|
190 /// This method should be used for short and mostly syncronous operations,
|
|
191 /// other operations which require time to run shoud be placed in
|
252
|
192 /// <see cref="StartInternalAsync(CancellationToken)"/> method.
|
251
|
193 /// </remarks>
|
|
194 protected virtual Task InitializeInternalAsync(CancellationToken ct) {
|
|
195 return Task.CompletedTask;
|
250
|
196 }
|
|
197
|
262
|
198 public void Start() {
|
|
199 Start(CancellationToken.None);
|
|
200 }
|
|
201
|
250
|
202 public void Start(CancellationToken ct) {
|
251
|
203 var cookie = new object();
|
|
204 if (MoveStart(cookie))
|
257
|
205 Safe.NoWait(ScheduleStartAndRun(ct, cookie));
|
256
|
206 else
|
|
207 throw new InvalidOperationException();
|
251
|
208 }
|
|
209
|
257
|
210 async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
|
|
211 try {
|
|
212 await ScheduleTask(StartInternalAsync, ct, cookie);
|
|
213 RunInternal();
|
|
214 } catch (Exception err) {
|
|
215 Fail(err);
|
|
216 }
|
|
217 }
|
|
218
|
252
|
219 protected virtual Task StartInternalAsync(CancellationToken ct) {
|
251
|
220 return Task.CompletedTask;
|
|
221 }
|
|
222
|
257
|
223 /// <summary>
|
|
224 /// This method is called after the component is enetered running state,
|
|
225 /// use this method to
|
|
226 /// </summary>
|
|
227 protected virtual void RunInternal() {
|
|
228
|
|
229 }
|
|
230
|
262
|
231 public void Stop() {
|
|
232 Stop(CancellationToken.None);
|
|
233 }
|
|
234
|
251
|
235 public void Stop(CancellationToken ct) {
|
|
236 var cookie = new object();
|
|
237 if (MoveStop(cookie))
|
257
|
238 Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
|
256
|
239 else
|
|
240 throw new InvalidOperationException();
|
251
|
241 }
|
|
242
|
|
243 async Task StopAsync(CancellationToken ct) {
|
|
244 m_current.Cancel();
|
262
|
245
|
|
246 try {
|
|
247 await Completion;
|
|
248 } catch(OperationCanceledException) {
|
|
249 // OK
|
|
250 }
|
251
|
251
|
|
252 ct.ThrowIfCancellationRequested();
|
|
253
|
|
254 await StopInternalAsync(ct);
|
|
255 }
|
|
256
|
|
257 protected virtual Task StopInternalAsync(CancellationToken ct) {
|
|
258 return Task.CompletedTask;
|
|
259 }
|
|
260
|
256
|
261 protected void Fail(Exception err) {
|
|
262 lock(m_lock) {
|
|
263 if (m_state != ExecutionState.Running)
|
|
264 return;
|
|
265 m_cookie = new object();
|
|
266 LastError = err;
|
|
267 State = ExecutionState.Failed;
|
|
268 }
|
|
269 }
|
|
270
|
251
|
271
|
|
272 #region state management
|
|
273
|
|
274 bool MoveInitialize(object cookie) {
|
|
275 lock (m_lock) {
|
|
276 if (State != ExecutionState.Created)
|
|
277 return false;
|
|
278 State = ExecutionState.Initializing;
|
|
279 m_cookie = cookie;
|
|
280 return true;
|
|
281 }
|
|
282 }
|
|
283
|
|
284 bool MoveStart(object cookie) {
|
|
285 lock (m_lock) {
|
|
286 if (State != ExecutionState.Ready)
|
|
287 return false;
|
|
288 State = ExecutionState.Starting;
|
|
289 m_cookie = cookie;
|
|
290 return true;
|
|
291 }
|
|
292 }
|
|
293
|
|
294 bool MoveStop(object cookie) {
|
|
295 lock (m_lock) {
|
|
296 if (State != ExecutionState.Starting && State != ExecutionState.Running)
|
|
297 return false;
|
|
298 State = ExecutionState.Stopping;
|
|
299 m_cookie = cookie;
|
|
300 return true;
|
|
301 }
|
|
302 }
|
|
303
|
|
304 void MoveSuccess(object cookie) {
|
|
305 lock (m_lock) {
|
|
306 if (m_cookie != cookie)
|
|
307 return;
|
|
308 switch (State) {
|
|
309 case ExecutionState.Initializing:
|
|
310 State = ExecutionState.Ready;
|
|
311 break;
|
|
312 case ExecutionState.Starting:
|
|
313 State = ExecutionState.Running;
|
|
314 break;
|
|
315 case ExecutionState.Stopping:
|
|
316 State = ExecutionState.Stopped;
|
|
317 break;
|
250
|
318 }
|
|
319 }
|
|
320 }
|
|
321
|
262
|
322 bool MoveFailed(Exception err, object cookie) {
|
251
|
323 lock (m_lock) {
|
|
324 if (m_cookie != cookie)
|
262
|
325 return false;
|
251
|
326 LastError = err;
|
|
327 State = ExecutionState.Failed;
|
262
|
328 return true;
|
251
|
329 }
|
250
|
330 }
|
|
331
|
257
|
332 Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
|
250
|
333
|
257
|
334 var op = AsyncOperationDescriptor.Create(async (x) => {
|
256
|
335 try {
|
|
336 await next(x);
|
|
337 MoveSuccess(cookie);
|
|
338 } catch (Exception e) {
|
|
339 MoveFailed(e, cookie);
|
260
|
340 throw;
|
256
|
341 }
|
|
342 }, ct);
|
257
|
343
|
|
344 m_current = op;
|
|
345 return op.Task;
|
250
|
346 }
|
251
|
347
|
|
348 #endregion
|
250
|
349 }
|
|
350 } |