Mercurial > pub > ImplabNet
changeset 12:eb418ba8275b promises
refactoring, added WorkerPool
author | cin |
---|---|
date | Tue, 05 Nov 2013 19:55:34 +0400 |
parents | 6ec82bf68c8e |
children | b0feb5b9ad1c |
files | Implab.suo Implab/ICancellable.cs Implab/IProgressHandler.cs Implab/IProgressNotifier.cs Implab/IPromise.cs Implab/ITaskController.cs Implab/Implab.csproj Implab/ManagedPromise.cs Implab/Parallels/WorkerPool.cs Implab/ProgressInitEventArgs.cs Implab/TaskController.cs Implab/ValueEventArgs.cs |
diffstat | 12 files changed, 226 insertions(+), 9 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ICancellable.cs Tue Nov 05 19:55:34 2013 +0400 @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface ICancellable { + bool Cancel(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/IProgressHandler.cs Tue Nov 05 19:55:34 2013 +0400 @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface IProgressHandler { + string Message { + get; + set; + } + float CurrentProgress { + get; + set; + } + void InitProgress(float current, float max, string message); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/IProgressNotifier.cs Tue Nov 05 19:55:34 2013 +0400 @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + public interface IProgressNotifier + { + event EventHandler<ValueEventArgs<string>> MessageUpdated; + event EventHandler<ValueEventArgs<float>> ProgressUpdated; + EventHandler<ProgressInitEventArgs> ProgressInit; + } +}
--- a/Implab/IPromise.cs Tue Nov 05 01:09:58 2013 +0400 +++ b/Implab/IPromise.cs Tue Nov 05 19:55:34 2013 +0400 @@ -5,7 +5,7 @@ namespace Implab { - public interface IPromise + public interface IPromise: ICancellable { /// <summary> /// Check whereather the promise has no more than one dependent promise. @@ -24,12 +24,6 @@ } /// <summary> - /// Tries to cancel the the complete chain of promises. - /// </summary> - /// <returns><c>true</c> - if the promise has been cancelled, otherwise the promise will be resolved (or resolved already).</returns> - bool Cancel(); - - /// <summary> /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the /// handler will be invoked immediatelly. /// </summary>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ITaskController.cs Tue Nov 05 19:55:34 2013 +0400 @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface ITaskController: IProgressHandler { + bool Cancelled { + get; + } + } +}
--- a/Implab/Implab.csproj Tue Nov 05 01:09:58 2013 +0400 +++ b/Implab/Implab.csproj Tue Nov 05 19:55:34 2013 +0400 @@ -32,7 +32,13 @@ <Reference Include="System" /> </ItemGroup> <ItemGroup> + <Compile Include="ICancellable.cs" /> + <Compile Include="IProgressHandler.cs" /> + <Compile Include="IProgressNotifier.cs" /> <Compile Include="IPromise.cs" /> + <Compile Include="ITaskController.cs" /> + <Compile Include="ManagedPromise.cs" /> + <Compile Include="Parallels\WorkerPool.cs" /> <Compile Include="PromiseState.cs" /> <Compile Include="TaskController.cs" /> <Compile Include="ProgressInitEventArgs.cs" />
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ManagedPromise.cs Tue Nov 05 19:55:34 2013 +0400 @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + + public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier { + + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/WorkerPool.cs Tue Nov 05 19:55:34 2013 +0400 @@ -0,0 +1,131 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public class WorkerPool : IDisposable { + readonly int m_minThreads; + readonly int m_maxThreads; + int m_runningThreads; + object m_lock = new object(); + + bool m_disposed = false; + ManualResetEvent m_hasTasks = new ManualResetEvent(false); + Queue<Action> m_queue = new Queue<Action>(); + + public WorkerPool(int min, int max) { + if (min < 0) + throw new ArgumentOutOfRangeException("min"); + if (min > max) + min = max; + m_minThreads = min; + m_maxThreads = max; + + for (int i = 0; i < m_minThreads; i++) + StartWorker(); + } + + public Promise<T> Invoke<T>(Func<T> task) { + if (m_disposed) + throw new ObjectDisposedException(ToString()); + if (task == null) + throw new ArgumentNullException("task"); + + var promise = new Promise<T>(); + + + + return promise; + } + + bool StartWorker() { + var current = m_runningThreads; + // use spins to allocate slot for the new thread + do { + if (current >= m_maxThreads) + // no more slots left + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + + // slot successfully allocated + + var worker = new Thread(this.Worker); + worker.Start(); + + return true; + } + + void EnqueueTask(Action task) { + Debug.Assert(task != null); + lock (m_queue) { + m_queue.Enqueue(task); + m_hasTasks.Set(); + } + } + + bool FetchTask(out Action task) { + task = null; + + while (true) { + + m_hasTasks.WaitOne(); + + if (m_disposed) + return false; + + lock (m_queue) { + if (m_queue.Count > 0) { + task = m_queue.Dequeue(); + return true; + } + + // no tasks left + // signal that no more tasks left, lock ensures that this event won't suppress newly added task + m_hasTasks.Reset(); + } + + bool exit = true; + + var current = m_runningThreads; + do { + if (current <= m_minThreads) { + exit = false; // this thread should return and wait for the new events + break; + } + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + + if (exit) + return false; + } + } + + void Worker() { + Action task; + while (FetchTask(out task)) + task(); + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + lock (m_lock) { + if (m_disposed) + return; + m_disposed = true; + } + m_hasTasks.Set(); + GC.SuppressFinalize(this); + } + } + + public void Dispose() { + Dispose(true); + } + + ~WorkerPool() { + Dispose(false); + } + } +}
--- a/Implab/ProgressInitEventArgs.cs Tue Nov 05 01:09:58 2013 +0400 +++ b/Implab/ProgressInitEventArgs.cs Tue Nov 05 19:55:34 2013 +0400 @@ -5,7 +5,7 @@ namespace Implab { - + [Serializable] public class ProgressInitEventArgs: EventArgs { public float MaxProgress
--- a/Implab/TaskController.cs Tue Nov 05 01:09:58 2013 +0400 +++ b/Implab/TaskController.cs Tue Nov 05 19:55:34 2013 +0400 @@ -12,7 +12,7 @@ /// <remarks> /// Members of this object are thread safe. /// </remarks> - class TaskController + class TaskController: IProgressNotifier, ITaskController, ICancellable { readonly object m_lock; string m_message; @@ -20,6 +20,8 @@ float m_current; float m_max; + bool m_cancelled; + public event EventHandler<ValueEventArgs<string>> MessageUpdated; public event EventHandler<ValueEventArgs<float>> ProgressUpdated; public event EventHandler<ProgressInitEventArgs> ProgressInit; @@ -82,6 +84,24 @@ } } + public bool Cancelled { + get { + lock (m_lock) + return m_cancelled; + } + } + + public bool Cancel() { + lock (m_lock) { + if (!m_cancelled) { + m_cancelled = true; + return true; + } else { + return false; + } + } + } + protected virtual void OnMessageUpdated() { var temp = MessageUpdated;