changeset 12:eb418ba8275b promises

refactoring, added WorkerPool
author cin
date Tue, 05 Nov 2013 19:55:34 +0400
parents 6ec82bf68c8e
children b0feb5b9ad1c
files Implab.suo Implab/ICancellable.cs Implab/IProgressHandler.cs Implab/IProgressNotifier.cs Implab/IPromise.cs Implab/ITaskController.cs Implab/Implab.csproj Implab/ManagedPromise.cs Implab/Parallels/WorkerPool.cs Implab/ProgressInitEventArgs.cs Implab/TaskController.cs Implab/ValueEventArgs.cs
diffstat 12 files changed, 226 insertions(+), 9 deletions(-) [+]
line wrap: on
line diff
Binary file Implab.suo has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ICancellable.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -0,0 +1,10 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+    public interface ICancellable {
+        bool Cancel();
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/IProgressHandler.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+    public interface IProgressHandler {
+        string Message {
+            get;
+            set;
+        }
+        float CurrentProgress {
+            get;
+            set;
+        }
+        void InitProgress(float current, float max, string message);
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/IProgressNotifier.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -0,0 +1,14 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab
+{
+    public interface IProgressNotifier
+    {
+        event EventHandler<ValueEventArgs<string>> MessageUpdated;
+        event EventHandler<ValueEventArgs<float>> ProgressUpdated;
+        EventHandler<ProgressInitEventArgs> ProgressInit;
+    }
+}
--- a/Implab/IPromise.cs	Tue Nov 05 01:09:58 2013 +0400
+++ b/Implab/IPromise.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -5,7 +5,7 @@
 
 namespace Implab
 {
-    public interface IPromise
+    public interface IPromise: ICancellable
     {
         /// <summary>
         /// Check whereather the promise has no more than one dependent promise.
@@ -24,12 +24,6 @@
         }
 
         /// <summary>
-        /// Tries to cancel the the complete chain of promises.
-        /// </summary>
-        /// <returns><c>true</c> -  if the promise has been cancelled, otherwise the promise will be resolved (or resolved already).</returns>
-        bool Cancel();
-
-        /// <summary>
         /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the
         /// handler will be invoked immediatelly.
         /// </summary>
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ITaskController.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+    public interface ITaskController: IProgressHandler {
+        bool Cancelled {
+            get;
+        }
+    }
+}
--- a/Implab/Implab.csproj	Tue Nov 05 01:09:58 2013 +0400
+++ b/Implab/Implab.csproj	Tue Nov 05 19:55:34 2013 +0400
@@ -32,7 +32,13 @@
     <Reference Include="System" />
   </ItemGroup>
   <ItemGroup>
+    <Compile Include="ICancellable.cs" />
+    <Compile Include="IProgressHandler.cs" />
+    <Compile Include="IProgressNotifier.cs" />
     <Compile Include="IPromise.cs" />
+    <Compile Include="ITaskController.cs" />
+    <Compile Include="ManagedPromise.cs" />
+    <Compile Include="Parallels\WorkerPool.cs" />
     <Compile Include="PromiseState.cs" />
     <Compile Include="TaskController.cs" />
     <Compile Include="ProgressInitEventArgs.cs" />
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/ManagedPromise.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+
+    public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier {
+        
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/WorkerPool.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -0,0 +1,131 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Diagnostics;
+
+namespace Implab.Parallels {
+    public class WorkerPool : IDisposable {
+        readonly int m_minThreads;
+        readonly int m_maxThreads;
+        int m_runningThreads;
+        object m_lock = new object();
+
+        bool m_disposed = false;
+        ManualResetEvent m_hasTasks = new ManualResetEvent(false);
+        Queue<Action> m_queue = new Queue<Action>();
+
+        public WorkerPool(int min, int max) {
+            if (min < 0)
+                throw new ArgumentOutOfRangeException("min");
+            if (min > max)
+                min = max;
+            m_minThreads = min;
+            m_maxThreads = max;
+
+            for (int i = 0; i < m_minThreads; i++)
+                StartWorker();
+        }
+
+        public Promise<T> Invoke<T>(Func<T> task) {
+            if (m_disposed)
+                throw new ObjectDisposedException(ToString());
+            if (task == null)
+                throw new ArgumentNullException("task");
+
+            var promise = new Promise<T>();
+
+
+
+            return promise;
+        }
+
+        bool StartWorker() {
+            var current = m_runningThreads;
+            // use spins to allocate slot for the new thread
+            do {
+                if (current >= m_maxThreads)
+                    // no more slots left
+                    return false;
+            } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current));
+
+            // slot successfully allocated
+
+            var worker = new Thread(this.Worker);
+            worker.Start();
+
+            return true;
+        }
+
+        void EnqueueTask(Action task) {
+            Debug.Assert(task != null);
+            lock (m_queue) {
+                m_queue.Enqueue(task);
+                m_hasTasks.Set();
+            }
+        }
+
+        bool FetchTask(out Action task) {
+            task = null;
+
+            while (true) {
+
+                m_hasTasks.WaitOne();
+
+                if (m_disposed)
+                    return false;
+
+                lock (m_queue) {
+                    if (m_queue.Count > 0) {
+                        task = m_queue.Dequeue();
+                        return true;
+                    }
+
+                    // no tasks left
+                    // signal that no more tasks left, lock ensures that this event won't suppress newly added task
+                    m_hasTasks.Reset();
+                }
+                
+                bool exit = true;
+
+                var current = m_runningThreads;
+                do {
+                    if (current <= m_minThreads) {
+                        exit = false; // this thread should return and wait for the new events
+                        break;
+                    }
+                } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current));
+
+                if (exit)
+                    return false;
+            }
+        }
+
+        void Worker() {
+            Action task;
+            while (FetchTask(out task))
+                task();
+        }
+
+        protected virtual void Dispose(bool disposing) {
+            if (disposing) {
+                lock (m_lock) {
+                    if (m_disposed)
+                        return;
+                    m_disposed = true;
+                }
+                m_hasTasks.Set();
+                GC.SuppressFinalize(this);
+            }
+        }
+
+        public void Dispose() {
+            Dispose(true);
+        }
+
+        ~WorkerPool() {
+            Dispose(false);
+        }
+    }
+}
--- a/Implab/ProgressInitEventArgs.cs	Tue Nov 05 01:09:58 2013 +0400
+++ b/Implab/ProgressInitEventArgs.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -5,7 +5,7 @@
 
 namespace Implab
 {
-
+    [Serializable]
     public class ProgressInitEventArgs: EventArgs
     {
         public float MaxProgress
--- a/Implab/TaskController.cs	Tue Nov 05 01:09:58 2013 +0400
+++ b/Implab/TaskController.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -12,7 +12,7 @@
     /// <remarks>
     /// Members of this object are thread safe.
     /// </remarks>
-    class TaskController
+    class TaskController: IProgressNotifier, ITaskController, ICancellable
     {
         readonly object m_lock;
         string m_message;
@@ -20,6 +20,8 @@
         float m_current;
         float m_max;
 
+        bool m_cancelled;
+
         public event EventHandler<ValueEventArgs<string>> MessageUpdated;
         public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
         public event EventHandler<ProgressInitEventArgs> ProgressInit;
@@ -82,6 +84,24 @@
             }
         }
 
+        public bool Cancelled {
+            get {
+                lock (m_lock)
+                    return m_cancelled;
+            }
+        }
+
+        public bool Cancel() {
+            lock (m_lock) {
+                if (!m_cancelled) {
+                    m_cancelled = true;
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+
         protected virtual void OnMessageUpdated()
         {
             var temp = MessageUpdated;
--- a/Implab/ValueEventArgs.cs	Tue Nov 05 01:09:58 2013 +0400
+++ b/Implab/ValueEventArgs.cs	Tue Nov 05 19:55:34 2013 +0400
@@ -5,6 +5,7 @@
 
 namespace Implab
 {
+    [Serializable]
     public class ValueEventArgs<T>: EventArgs
     {
         public ValueEventArgs(T value)