Mercurial > pub > ImplabNet
changeset 251:7c7e9ad6fe4a v3
Prerelease version of RunnableComponent
Added draft messaging interfaces
Added more more helpers to Xml/SerializationHelpers
author | cin |
---|---|
date | Sun, 11 Feb 2018 00:49:51 +0300 |
parents | 9f63dade3a40 |
children | 6f4630d0bcd9 |
files | Implab.Test/UnitTest1.cs Implab/Automaton/DFATable.cs Implab/Components/ExecutionState.cs Implab/Components/IInitializable.cs Implab/Components/IRunnable.cs Implab/Components/ObjectPool.cs Implab/Components/RunnableComponent.cs Implab/Deferred.cs Implab/Deferred`1.cs Implab/Formats/Json/JsonStringScanner.cs Implab/Messaging/IConsumer.cs Implab/Messaging/IProducer.cs Implab/Messaging/ISession.cs Implab/Parallels/BlockingQueue.cs Implab/Safe.cs Implab/TaskHelpers.cs Implab/Xml/JsonXmlReader.cs Implab/Xml/SerializationHelpers.cs |
diffstat | 18 files changed, 447 insertions(+), 52 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/UnitTest1.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab.Test/UnitTest1.cs Sun Feb 11 00:49:51 2018 +0300 @@ -1,4 +1,5 @@ using System; +using System.Diagnostics; using System.Threading; using Xunit; @@ -9,9 +10,20 @@ [Fact] public void Test1() { - using(var cts = new CancellationTokenSource(1000)) { - PromiseHelper.Sleep(10000, cts.Token).Join(); - } + var listener = new TextWriterTraceListener(Console.Out); + var source = new TraceSource("Custom",SourceLevels.ActivityTracing); + + source.Listeners.Add(listener); + + Trace.Listeners.Add(listener); + Trace.WriteLine("Hello!"); + Trace.CorrelationManager.StartLogicalOperation(); + Trace.WriteLine("Inner"); + foreach(var x in Trace.CorrelationManager.LogicalOperationStack) + Trace.WriteLine($"-{x}"); + source.TraceEvent(TraceEventType.Information, 1, "source event"); + source.TraceData(TraceEventType.Start, 1, DateTime.Now); + Trace.CorrelationManager.StopLogicalOperation(); } } }
--- a/Implab/Automaton/DFATable.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Automaton/DFATable.cs Sun Feb 11 00:49:51 2018 +0300 @@ -20,7 +20,7 @@ #region IDFADefinition implementation public bool IsFinalState(int s) { - Safe.ArgumentInRange(s, 0, m_stateCount, "s"); + Safe.ArgumentInRange(s >= 0 && s < m_stateCount, nameof(s)); return m_finalStates.Contains(s); } @@ -46,7 +46,7 @@ #endregion public void SetInitialState(int s) { - Safe.ArgumentAssert(s >= 0, "s"); + Safe.ArgumentInRange(s >= 0, nameof(s)); m_stateCount = Math.Max(m_stateCount, s + 1); m_initialState = s; } @@ -57,9 +57,9 @@ } public void Add(AutomatonTransition item) { - Safe.ArgumentAssert(item.s1 >= 0, "item"); - Safe.ArgumentAssert(item.s2 >= 0, "item"); - Safe.ArgumentAssert(item.edge >= 0, "item"); + Safe.ArgumentAssert(item.s1 >= 0, nameof(item)); + Safe.ArgumentAssert(item.s2 >= 0, nameof(item)); + Safe.ArgumentAssert(item.edge >= 0, nameof(item)); m_stateCount = Math.Max(m_stateCount, Math.Max(item.s1, item.s2) + 1); m_symbolCount = Math.Max(m_symbolCount, item.edge + 1);
--- a/Implab/Components/ExecutionState.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Components/ExecutionState.cs Sun Feb 11 00:49:51 2018 +0300 @@ -15,6 +15,8 @@ Stopping, + Stopped, + Failed, Disposed,
--- a/Implab/Components/IInitializable.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Components/IInitializable.cs Sun Feb 11 00:49:51 2018 +0300 @@ -11,9 +11,16 @@ /// Completes initialization. /// </summary> /// <remarks> + /// <para> /// Normally virtual methods shouldn't be called from the constructor, due to the incomplete object state, but /// they can be called from this method. This method is also usefull when we constructing a complex grpah /// of components where cyclic references may take place. + /// </para> + /// <para> + /// In asyncronous patterns <see cref="Initialize()"/> can be called + /// to start initialization and the <see cref="IRunnable.Completion"/> + /// property can be used to track operation completion. + /// </para> /// </remarks> void Initialize(); }
--- a/Implab/Components/IRunnable.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Components/IRunnable.cs Sun Feb 11 00:49:51 2018 +0300 @@ -6,23 +6,53 @@ /// <summary> /// Interface for the component which performs a long running task. /// </summary> - public interface IRunnable : IDisposable { + /// <remarks> + /// The access to the runnable component should be sequential, the + /// componet should support asynchronous completion of the initiated + /// operation but operations itself must be initiated sequentially. + /// </remarks> + public interface IRunnable { /// <summary> /// Starts this instance /// </summary> + /// <remarks> + /// This operation is cancellable and it's expected to move to + /// the failed state or just ignore the cancellation request, + /// </remarks> void Start(CancellationToken ct); /// <summary> - /// Stops this instance and releases all resources, after the instance is stopped it is moved to Disposed state and can't be reused. + /// Stops this instance and releases all resources, after the + /// instance is stopped it is moved to Disposed state and + /// can't be reused. /// </summary> + /// <remarks> + /// If the componet was in the starting state the pending operation + /// will be requested to cancel. The stop operatin will be + /// performed only if the component in the running state. + /// </remarks> void Stop(CancellationToken ct); - Task<ExecutionState> Completion { get; } + /// <summary> + /// The result of the last started operation. This property reflects + /// only the result of the last started operation and therefore should + /// change only if a new operation is initiated. + /// </summary> + Task Completion { get; } + /// <summary> + /// Current state of the componenet + /// </summary> ExecutionState State { get; } + /// <summary> + /// Event to monitor the state of the component. + /// </summary> event EventHandler<StateChangeEventArgs> StateChanged; + /// <summary> + /// The last error + /// </summary> Exception LastError { get; } } }
--- a/Implab/Components/ObjectPool.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Components/ObjectPool.cs Sun Feb 11 00:49:51 2018 +0300 @@ -26,7 +26,7 @@ } protected ObjectPool(int size) { - Safe.ArgumentInRange(size,1,size,"size"); + Safe.ArgumentInRange(size > 0, nameof(size)); m_size = size; }
--- a/Implab/Components/RunnableComponent.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Components/RunnableComponent.cs Sun Feb 11 00:49:51 2018 +0300 @@ -1,57 +1,273 @@ using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; -namespace Implab.Components -{ - public class RunnableComponent : IRunnable { +namespace Implab.Components { + /// <summary> + /// Base class for implementing components which support start and stop operations, + /// such components may represent running services. + /// </summary> + /// <remarks> + /// This class provides a basic lifecycle from the creation to the + /// termination of the component. + /// </remarks> + public class RunnableComponent : IRunnable, IInitializable, IDisposable { + + /// <summary> + /// This class bound <see cref="CancellationTokenSource"/> lifetime to the task, + /// when the task completes the associated token source will be disposed. + /// </summary> + class AsyncOperationDescriptor { + + public static AsyncOperationDescriptor None { get; } = new AsyncOperationDescriptor(); + + readonly CancellationTokenSource m_cts; + + bool m_done; + + public CancellationToken Token { + get { return m_cts == null ? CancellationToken.None : m_cts.Token; } + } + + public Task Task { get; private set; } + + private AsyncOperationDescriptor(Task task, CancellationTokenSource cts) { + m_cts = cts; + Task = Chain(task); + } + + private AsyncOperationDescriptor() { + Task = Task.CompletedTask; + } + public void Cancel() { + if (m_cts != null) { + lock (m_cts) { + if (!m_done) + m_cts.Cancel(); + } + } + } + + void Done() { + if (m_cts != null) { + lock (m_cts) { + m_done = true; + m_cts.Dispose(); + } + } else { + m_done = true; + } + } + + async Task Chain(Task other) { + try { + await other; + } finally { + Done(); + } + } + + public static AsyncOperationDescriptor Create(Func<CancellationToken, Task> factory, CancellationToken ct) { + var cts = ct.CanBeCanceled ? + CancellationTokenSource.CreateLinkedTokenSource(ct) : + new CancellationTokenSource(); + + return new AsyncOperationDescriptor(factory(cts.Token), cts); + } + + } + + // this lock is used to synchronize state flow of the component during + // completions or the operations. readonly object m_lock = new object(); - CancellationTokenSource m_cts; + // current operation cookie, used to check wheather a call to + // MoveSuccess/MoveFailed method belongs to the current + // operation, if cookies didn't match ignore completion result. + object m_cookie; - public Task<ExecutionState> Completion { - get; - private set; + AsyncOperationDescriptor m_current = AsyncOperationDescriptor.None; + + ExecutionState m_state; + + + protected RunnableComponent(bool initialized) { + State = initialized ? ExecutionState.Ready : ExecutionState.Created; } - public ExecutionState State => throw new NotImplementedException(); + public Task Completion { + get { return m_current.Task; } + } - public Exception LastError => throw new NotImplementedException(); + public ExecutionState State { + get { return m_state; } + private set { + if (m_state != value) { + m_state = value; + StateChanged.DispatchEvent(this, new StateChangeEventArgs { + State = value, + LastError = LastError + }); + } + } + } + + public Exception LastError { get; private set; } public event EventHandler<StateChangeEventArgs> StateChanged; + /// <summary> + /// Releases all resources used by the current component regardless of its + /// execution state. + /// </summary> + /// <remarks> + /// Calling to this method may result unexpedted results if the component + /// isn't in the stopped state. Call this method after the component is + /// stopped if needed or if the component is in the failed state. + /// </remarks> public void Dispose() { - lock(m_lock) { + bool dispose = false; + if (dispose) { Dispose(true); GC.SuppressFinalize(this); } } + ~RunnableComponent() { + Dispose(false); + } + + /// <summary> + /// Releases all resources used by the current component regardless of its + /// execution state. + /// </summary> + /// <param name="disposing">Indicates that the component is disposed + /// during a normal disposing or during GC.</param> protected virtual void Dispose(bool disposing) { - if (disposing) { - Safe.Dispose(m_cts); - } + } + + public void Initialize() { + var cookie = new object(); + if (MoveInitialize(cookie)) + ScheduleTask(InitializeInternalAsync, CancellationToken.None, cookie); + } + + /// <summary> + /// This method is used for initialization during a component creation. + /// </summary> + /// <param name="ct">A cancellation token for this operation</param> + /// <remarks> + /// This method should be used for short and mostly syncronous operations, + /// other operations which require time to run shoud be placed in + /// <see cref="StartInternal(CancellationToken)"/> method. + /// </remarks> + protected virtual Task InitializeInternalAsync(CancellationToken ct) { + return Task.CompletedTask; } public void Start(CancellationToken ct) { - lock(m_lock) { - switch (State) - { - - default: - throw new InvalidOperationException(); + var cookie = new object(); + if (MoveStart(cookie)) + ScheduleTask(StartInternal, ct, cookie); + } + + protected virtual Task StartInternal(CancellationToken ct) { + return Task.CompletedTask; + } + + public void Stop(CancellationToken ct) { + var cookie = new object(); + if (MoveStop(cookie)) + ScheduleTask(StopAsync, ct, cookie); + } + + async Task StopAsync(CancellationToken ct) { + m_current.Cancel(); + await Completion; + + ct.ThrowIfCancellationRequested(); + + await StopInternalAsync(ct); + } + + protected virtual Task StopInternalAsync(CancellationToken ct) { + return Task.CompletedTask; + } + + + #region state management + + bool MoveInitialize(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Created) + return false; + State = ExecutionState.Initializing; + m_cookie = cookie; + return true; + } + } + + bool MoveStart(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Ready) + return false; + State = ExecutionState.Starting; + m_cookie = cookie; + return true; + } + } + + bool MoveStop(object cookie) { + lock (m_lock) { + if (State != ExecutionState.Starting && State != ExecutionState.Running) + return false; + State = ExecutionState.Stopping; + m_cookie = cookie; + return true; + } + } + + void MoveSuccess(object cookie) { + lock (m_lock) { + if (m_cookie != cookie) + return; + switch (State) { + case ExecutionState.Initializing: + State = ExecutionState.Ready; + break; + case ExecutionState.Starting: + State = ExecutionState.Running; + break; + case ExecutionState.Stopping: + State = ExecutionState.Stopped; + break; } } } - public void Stop(CancellationToken ct) { - throw new NotImplementedException(); + void MoveFailed(Exception err, object cookie) { + lock (m_lock) { + if (m_cookie != cookie) + return; + LastError = err; + State = ExecutionState.Failed; + } } - protected virtual Task StartImpl(CancellationToken ct) { + - return Task.CompletedTask; + protected async void ScheduleTask(Func<CancellationToken, Task> next, CancellationToken ct, object cookie) { + try { + m_current = AsyncOperationDescriptor.Create(next, ct); + await m_current.Task; + MoveSuccess(cookie); + } catch (Exception e) { + MoveFailed(e, cookie); + } } + + #endregion } } \ No newline at end of file
--- a/Implab/Deferred.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Deferred.cs Sun Feb 11 00:49:51 2018 +0300 @@ -1,5 +1,6 @@ using System; using System.Diagnostics; +using System.Threading.Tasks; namespace Implab { /// <summary> @@ -49,5 +50,15 @@ } } + public virtual void Resolve(Task thenable) { + if (thenable == null) + Reject(new Exception("The promise or task are expected")); + try { + thenable.Then(this); + } catch(Exception err) { + Reject(err); + } + } + } } \ No newline at end of file
--- a/Implab/Deferred`1.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Deferred`1.cs Sun Feb 11 00:49:51 2018 +0300 @@ -1,5 +1,6 @@ using System; using System.Diagnostics; +using System.Threading.Tasks; namespace Implab { public class Deferred<T> : IResolvable<T> { @@ -45,5 +46,16 @@ Reject(err); } } + + public virtual void Resolve(Task<T> thenable) { + if (thenable == null) + Reject(new Exception("The promise or task are expected")); + + try { + thenable.Then(this); + } catch (Exception err) { + Reject(err); + } + } } } \ No newline at end of file
--- a/Implab/Formats/Json/JsonStringScanner.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Formats/Json/JsonStringScanner.cs Sun Feb 11 00:49:51 2018 +0300 @@ -43,11 +43,8 @@ public static JsonStringScanner Create(string data, int offset, int length) { Safe.ArgumentNotNull(data, nameof(data)); - Safe.ArgumentGreaterThan(offset, 0, nameof(offset)); - Safe.ArgumentGreaterThan(length, 0, nameof(length)); - - if (offset + length > data.Length) - throw new ArgumentOutOfRangeException("Specified offset and length are out of the string bounds"); + Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset)); + Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length)); if (length <= _defaultBuffer) { var buffer = new char[length]; @@ -63,11 +60,8 @@ public static JsonStringScanner Create(char[] data, int offset, int length) { Safe.ArgumentNotNull(data, nameof(data)); - Safe.ArgumentGreaterThan(offset, 0, nameof(offset)); - Safe.ArgumentGreaterThan(length, 0, nameof(length)); - - if (offset + length > data.Length) - throw new ArgumentOutOfRangeException("Specified offset and length are out of the array bounds"); + Safe.ArgumentInRange(offset >= 0 && offset < data.Length , nameof(offset)); + Safe.ArgumentInRange(length >= 0 && offset + length <= data.Length, nameof(length)); return new JsonStringScanner(null, data, offset, offset + length, offset + length);
--- a/Implab/Messaging/IConsumer.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Messaging/IConsumer.cs Sun Feb 11 00:49:51 2018 +0300 @@ -3,7 +3,9 @@ namespace Implab.Messaging { public interface IConsumer<T> { - Task<T> Receive(CancellationToken ct); + T Receive(CancellationToken ct); + + Task<T> ReceiveAsync(CancellationToken ct); bool TryReceive(out T message); }
--- a/Implab/Messaging/IProducer.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Messaging/IProducer.cs Sun Feb 11 00:49:51 2018 +0300 @@ -4,8 +4,12 @@ namespace Implab.Messaging { public interface IProducer<T> { + void PostMessage(T message, CancellationToken ct); + Task PostMessageAsync(T message, CancellationToken ct); + void PostMessages(IEnumerable<T> messages, CancellationToken ct); + Task PostMessagesAsync(IEnumerable<T> messages, CancellationToken ct); } } \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Messaging/ISession.cs Sun Feb 11 00:49:51 2018 +0300 @@ -0,0 +1,14 @@ +namespace Implab.Messaging { + public interface ISession { + /// <summary> + /// Starts message consumers, call this method after all adapters are ready + /// </summary> + void Start(); + + /// <summary> + /// Stops message consumers + /// </summary> + void Stop(); + + } +} \ No newline at end of file
--- a/Implab/Parallels/BlockingQueue.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Parallels/BlockingQueue.cs Sun Feb 11 00:49:51 2018 +0300 @@ -54,7 +54,7 @@ } public T[] GetRange(int max, int timeout) { - Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + Safe.ArgumentInRange(max > 0 , nameof(max)); var buffer = new T[max]; int actual; @@ -83,7 +83,7 @@ } public T[] GetRange(int max) { - Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + Safe.ArgumentInRange(max > 0, nameof(max)); var buffer = new T[max]; int actual;
--- a/Implab/Safe.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Safe.cs Sun Feb 11 00:49:51 2018 +0300 @@ -6,6 +6,7 @@ using System.Diagnostics; using System.Collections; using System.Runtime.CompilerServices; +using System.Threading.Tasks; #if NET_4_5 using System.Threading.Tasks; @@ -48,14 +49,14 @@ } [MethodImpl(MethodImplOptions.AggressiveInlining)] - internal static void ArgumentGreaterThan(int value, int min, string paramName) { + internal static void ArgumentGreaterEqThan(int value, int min, string paramName) { if (value < min) throw new ArgumentOutOfRangeException(paramName); } [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static void ArgumentInRange(int value, int min, int max, string paramName) { - if (value < min || value > max) + public static void ArgumentInRange(bool condition, string paramName) { + if (!condition) throw new ArgumentOutOfRangeException(paramName); } @@ -144,6 +145,12 @@ public static void NoWait(IPromise promise) { } + public static void NoWait(Task promise) { + } + + public static void NoWait<T>(Task<T> promise) { + } + [DebuggerStepThrough] public static IPromise<T> Run<T>(Func<IPromise<T>> action) { ArgumentNotNull(action, "action");
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/TaskHelpers.cs Sun Feb 11 00:49:51 2018 +0300 @@ -0,0 +1,70 @@ +using System; +using System.Threading.Tasks; + +namespace Implab { + public static class TaskHelpers { + + public static async Task Then(this Task that, Action fulfilled, Action<Exception> rejected) { + Safe.ArgumentNotNull(that, nameof(that)); + if (rejected != null) { + try { + await that; + } catch (Exception e) { + rejected(e); + return; + } + } else { + await that; + } + + if (fulfilled != null) + fulfilled(); + } + + public static async Task Then(this Task that, Action fulfilled) { + Safe.ArgumentNotNull(that, nameof(that)); + await that; + if (fulfilled != null) + fulfilled(); + } + + public static async Task Then(this Task that, Func<Task> fulfilled) { + Safe.ArgumentNotNull(that, nameof(that)); + await that; + if (fulfilled != null) + await fulfilled(); + } + + public static async Task Finally(this Task that, Action handler) { + Safe.ArgumentNotNull(that, nameof(that)); + try { + await that; + } finally { + if (handler != null) + handler(); + } + } + + public static async void Then(this Task that, IResolvable next) { + try { + await that; + } catch (Exception e) { + next.Reject(e); + return; + } + next.Resolve(); + } + + public static async void Then<T>(this Task<T> that, IResolvable<T> next) { + T result; + try { + result = await that; + } catch (Exception e) { + next.Reject(e); + return; + } + next.Resolve(result); + } + + } +} \ No newline at end of file
--- a/Implab/Xml/JsonXmlReader.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Xml/JsonXmlReader.cs Sun Feb 11 00:49:51 2018 +0300 @@ -163,7 +163,7 @@ } public override string GetAttribute(int i) { - Safe.ArgumentInRange(i, 0, AttributeCount - 1, nameof(i)); + Safe.ArgumentInRange(i >= 0 && i < AttributeCount, nameof(i)); return m_attributes[i].Value; }
--- a/Implab/Xml/SerializationHelpers.cs Thu Feb 01 02:43:35 2018 +0300 +++ b/Implab/Xml/SerializationHelpers.cs Sun Feb 11 00:49:51 2018 +0300 @@ -38,6 +38,20 @@ SerializersPool<T>.Instance.Serialize(writer, obj); } + public static void SerializeToElementChild<T>(XmlElement element, T obj) { + using(var writer = element.CreateNavigator().AppendChild()) + SerializersPool<T>.Instance.Serialize(writer, obj); + } + + public static T Deserialize<T>(XmlReader reader) { + return SerializersPool<T>.Instance.Deserialize(reader); + } + + public static T DeserializeFromFile<T>(string file) { + using(var reader = XmlReader.Create(File.OpenText(file))) + return Deserialize<T>(reader); + } + public static T DeserializeFromString<T>(string data) { return SerializersPool<T>.Instance.DeserializeFromString(data); }