250
|
1 using System;
|
251
|
2 using System.Diagnostics;
|
250
|
3 using System.Threading;
|
|
4 using System.Threading.Tasks;
|
|
5
|
251
|
6 namespace Implab.Components {
|
|
7 /// <summary>
|
|
8 /// Base class for implementing components which support start and stop operations,
|
|
9 /// such components may represent running services.
|
|
10 /// </summary>
|
|
11 /// <remarks>
|
|
12 /// This class provides a basic lifecycle from the creation to the
|
|
13 /// termination of the component.
|
|
14 /// </remarks>
|
257
|
15 public abstract class RunnableComponent : IAsyncComponent, IRunnable, IInitializable, IDisposable {
|
251
|
16
|
|
17 /// <summary>
|
252
|
18 /// This class bounds <see cref="CancellationTokenSource"/> lifetime to the task,
|
251
|
19 /// when the task completes the associated token source will be disposed.
|
|
20 /// </summary>
|
|
21 class AsyncOperationDescriptor {
|
|
22
|
|
23 public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor();
|
|
24
|
|
25 readonly CancellationTokenSource m_cts;
|
|
26
|
|
27 bool m_done;
|
|
28
|
|
29 public CancellationToken Token {
|
|
30 get { return m_cts == null ? CancellationToken.None : m_cts.Token; }
|
|
31 }
|
|
32
|
|
33 public Task Task { get; private set; }
|
|
34
|
|
35 private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) {
|
|
36 m_cts = cts;
|
|
37 Task = Chain(task);
|
|
38 }
|
|
39
|
|
40 private AsyncOperationDescriptor() {
|
|
41 Task = Task.CompletedTask;
|
|
42 }
|
250
|
43
|
251
|
44 public void Cancel() {
|
|
45 if (m_cts != null) {
|
|
46 lock (m_cts) {
|
|
47 if (!m_done)
|
|
48 m_cts.Cancel();
|
|
49 }
|
|
50 }
|
|
51 }
|
|
52
|
|
53 void Done() {
|
|
54 if (m_cts != null) {
|
|
55 lock (m_cts) {
|
|
56 m_done = true;
|
|
57 m_cts.Dispose();
|
|
58 }
|
|
59 } else {
|
|
60 m_done = true;
|
|
61 }
|
|
62 }
|
|
63
|
|
64 async Task Chain(Task other) {
|
|
65 try {
|
|
66 await other;
|
|
67 } finally {
|
|
68 Done();
|
|
69 }
|
|
70 }
|
|
71
|
|
72 public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) {
|
|
73 var cts = ct.CanBeCanceled ?
|
|
74 CancellationTokenSource.CreateLinkedTokenSource(ct) :
|
|
75 new CancellationTokenSource();
|
|
76
|
|
77 return new AsyncOperationDescriptor(factory(cts.Token), cts);
|
|
78 }
|
|
79
|
|
80 }
|
|
81
|
|
82 // this lock is used to synchronize state flow of the component during
|
256
|
83 // processing calls from a client and internal processes.
|
250
|
84 readonly object m_lock = new object();
|
|
85
|
251
|
86 // current operation cookie, used to check wheather a call to
|
|
87 // MoveSuccess/MoveFailed method belongs to the current
|
|
88 // operation, if cookies didn't match ignore completion result.
|
|
89 object m_cookie;
|
250
|
90
|
256
|
91 // AsyncOperationDscriptor aggregates a task and it's cancellation token
|
251
|
92 AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None;
|
|
93
|
|
94 ExecutionState m_state;
|
|
95
|
257
|
96 /// <summary>
|
|
97 /// Объект синхронизации используется для обеспечения совместного доступа
|
|
98 /// клиента компоненты и процессов, протекающих внутри компоненты, к общему
|
|
99 /// состоянию, т.е.true таким свойствам, как <see cref="State"/>,
|
|
100 /// <see cref="LastError"/>. Обработчики события <see cref="StateChanged"/>
|
|
101 /// вызываются уже с установленной блокировкой, поэтому дополнительная
|
|
102 /// синхронизация не требуется.
|
|
103 /// </summary>
|
|
104 public object SynchronizationObject { get { return m_lock; } }
|
251
|
105
|
|
106 protected RunnableComponent(bool initialized) {
|
|
107 State = initialized ? ExecutionState.Ready : ExecutionState.Created;
|
250
|
108 }
|
|
109
|
251
|
110 public Task Completion {
|
|
111 get { return m_current.Task; }
|
|
112 }
|
250
|
113
|
251
|
114 public ExecutionState State {
|
|
115 get { return m_state; }
|
|
116 private set {
|
|
117 if (m_state != value) {
|
|
118 m_state = value;
|
|
119 StateChanged.DispatchEvent(this, new StateChangeEventArgs {
|
|
120 State = value,
|
|
121 LastError = LastError
|
|
122 });
|
|
123 }
|
|
124 }
|
|
125 }
|
|
126
|
|
127 public Exception LastError { get; private set; }
|
250
|
128
|
257
|
129 /// <summary>
|
|
130 /// Событие изменения состояния компоненты.see Обработчики данного события
|
|
131 /// вызываются внутри блокировки <see cref="SynchronizationObject"/> и должны
|
|
132 /// выполняться максимально быстро.
|
|
133 /// </summary>
|
250
|
134 public event EventHandler<StateChangeEventArgs> StateChanged;
|
|
135
|
251
|
136 /// <summary>
|
|
137 /// Releases all resources used by the current component regardless of its
|
|
138 /// execution state.
|
|
139 /// </summary>
|
|
140 /// <remarks>
|
|
141 /// Calling to this method may result unexpedted results if the component
|
|
142 /// isn't in the stopped state. Call this method after the component is
|
|
143 /// stopped if needed or if the component is in the failed state.
|
|
144 /// </remarks>
|
250
|
145 public void Dispose() {
|
251
|
146 bool dispose = false;
|
257
|
147 lock (SynchronizationObject) {
|
|
148 if (m_state != ExecutionState.Disposed) {
|
|
149 dispose = true;
|
|
150 m_state = ExecutionState.Disposed;
|
|
151 m_cookie = new object();
|
|
152 }
|
|
153 }
|
251
|
154 if (dispose) {
|
250
|
155 Dispose(true);
|
|
156 GC.SuppressFinalize(this);
|
|
157 }
|
|
158 }
|
|
159
|
251
|
160 ~RunnableComponent() {
|
|
161 Dispose(false);
|
|
162 }
|
|
163
|
|
164 /// <summary>
|
|
165 /// Releases all resources used by the current component regardless of its
|
|
166 /// execution state.
|
|
167 /// </summary>
|
|
168 /// <param name="disposing">Indicates that the component is disposed
|
|
169 /// during a normal disposing or during GC.</param>
|
250
|
170 protected virtual void Dispose(bool disposing) {
|
251
|
171 }
|
|
172
|
|
173 public void Initialize() {
|
|
174 var cookie = new object();
|
|
175 if (MoveInitialize(cookie))
|
257
|
176 Safe.NoWait(ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie));
|
256
|
177 else
|
|
178 throw new InvalidOperationException();
|
251
|
179 }
|
|
180
|
|
181 /// <summary>
|
|
182 /// This method is used for initialization during a component creation.
|
|
183 /// </summary>
|
|
184 /// <param name="ct">A cancellation token for this operation</param>
|
|
185 /// <remarks>
|
|
186 /// This method should be used for short and mostly syncronous operations,
|
|
187 /// other operations which require time to run shoud be placed in
|
252
|
188 /// <see cref="StartInternalAsync(CancellationToken)"/> method.
|
251
|
189 /// </remarks>
|
|
190 protected virtual Task InitializeInternalAsync(CancellationToken ct) {
|
|
191 return Task.CompletedTask;
|
250
|
192 }
|
|
193
|
|
194 public void Start(CancellationToken ct) {
|
251
|
195 var cookie = new object();
|
|
196 if (MoveStart(cookie))
|
257
|
197 Safe.NoWait(ScheduleStartAndRun(ct, cookie));
|
256
|
198 else
|
|
199 throw new InvalidOperationException();
|
251
|
200 }
|
|
201
|
257
|
202 async Task ScheduleStartAndRun(CancellationToken ct, object cookie) {
|
|
203 try {
|
|
204 await ScheduleTask(StartInternalAsync, ct, cookie);
|
|
205 RunInternal();
|
|
206 } catch (Exception err) {
|
|
207 Fail(err);
|
|
208 }
|
|
209 }
|
|
210
|
252
|
211 protected virtual Task StartInternalAsync(CancellationToken ct) {
|
251
|
212 return Task.CompletedTask;
|
|
213 }
|
|
214
|
257
|
215 /// <summary>
|
|
216 /// This method is called after the component is enetered running state,
|
|
217 /// use this method to
|
|
218 /// </summary>
|
|
219 protected virtual void RunInternal() {
|
|
220
|
|
221 }
|
|
222
|
251
|
223 public void Stop(CancellationToken ct) {
|
|
224 var cookie = new object();
|
|
225 if (MoveStop(cookie))
|
257
|
226 Safe.NoWait(ScheduleTask(StopAsync, ct, cookie));
|
256
|
227 else
|
|
228 throw new InvalidOperationException();
|
251
|
229 }
|
|
230
|
|
231 async Task StopAsync(CancellationToken ct) {
|
|
232 m_current.Cancel();
|
|
233 await Completion;
|
|
234
|
|
235 ct.ThrowIfCancellationRequested();
|
|
236
|
|
237 await StopInternalAsync(ct);
|
|
238 }
|
|
239
|
|
240 protected virtual Task StopInternalAsync(CancellationToken ct) {
|
|
241 return Task.CompletedTask;
|
|
242 }
|
|
243
|
256
|
244 protected void Fail(Exception err) {
|
|
245 lock(m_lock) {
|
|
246 if (m_state != ExecutionState.Running)
|
|
247 return;
|
|
248 m_cookie = new object();
|
|
249 LastError = err;
|
|
250 State = ExecutionState.Failed;
|
|
251 }
|
|
252 }
|
|
253
|
251
|
254
|
|
255 #region state management
|
|
256
|
|
257 bool MoveInitialize(object cookie) {
|
|
258 lock (m_lock) {
|
|
259 if (State != ExecutionState.Created)
|
|
260 return false;
|
|
261 State = ExecutionState.Initializing;
|
|
262 m_cookie = cookie;
|
|
263 return true;
|
|
264 }
|
|
265 }
|
|
266
|
|
267 bool MoveStart(object cookie) {
|
|
268 lock (m_lock) {
|
|
269 if (State != ExecutionState.Ready)
|
|
270 return false;
|
|
271 State = ExecutionState.Starting;
|
|
272 m_cookie = cookie;
|
|
273 return true;
|
|
274 }
|
|
275 }
|
|
276
|
|
277 bool MoveStop(object cookie) {
|
|
278 lock (m_lock) {
|
|
279 if (State != ExecutionState.Starting && State != ExecutionState.Running)
|
|
280 return false;
|
|
281 State = ExecutionState.Stopping;
|
|
282 m_cookie = cookie;
|
|
283 return true;
|
|
284 }
|
|
285 }
|
|
286
|
|
287 void MoveSuccess(object cookie) {
|
|
288 lock (m_lock) {
|
|
289 if (m_cookie != cookie)
|
|
290 return;
|
|
291 switch (State) {
|
|
292 case ExecutionState.Initializing:
|
|
293 State = ExecutionState.Ready;
|
|
294 break;
|
|
295 case ExecutionState.Starting:
|
|
296 State = ExecutionState.Running;
|
|
297 break;
|
|
298 case ExecutionState.Stopping:
|
|
299 State = ExecutionState.Stopped;
|
|
300 break;
|
250
|
301 }
|
|
302 }
|
|
303 }
|
|
304
|
251
|
305 void MoveFailed(Exception err, object cookie) {
|
|
306 lock (m_lock) {
|
|
307 if (m_cookie != cookie)
|
|
308 return;
|
|
309 LastError = err;
|
|
310 State = ExecutionState.Failed;
|
|
311 }
|
250
|
312 }
|
|
313
|
257
|
314 Task ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) {
|
250
|
315
|
257
|
316 var op = AsyncOperationDescriptor.Create(async (x) => {
|
256
|
317 try {
|
|
318 await next(x);
|
|
319 MoveSuccess(cookie);
|
|
320 } catch (Exception e) {
|
|
321 MoveFailed(e, cookie);
|
260
|
322 throw;
|
256
|
323 }
|
|
324 }, ct);
|
257
|
325
|
|
326 m_current = op;
|
|
327 return op.Task;
|
250
|
328 }
|
251
|
329
|
|
330 #endregion
|
250
|
331 }
|
|
332 } |