Mercurial > pub > ImplabNet
changeset 18:0c924dff5498
Слияние с promises
author | cin |
---|---|
date | Fri, 08 Nov 2013 01:27:04 +0400 (2013-11-07) |
parents | dfa21d507bc5 (current diff) 7cd4a843b4e4 (diff) |
children | f0568ff069a5 |
files | Implab/AsyncPool.cs |
diffstat | 24 files changed, 1538 insertions(+), 233 deletions(-) [+] |
line wrap: on
line diff
--- a/.hgignore Mon Oct 21 02:34:31 2013 +0400 +++ b/.hgignore Fri Nov 08 01:27:04 2013 +0400 @@ -10,3 +10,4 @@ Implab.Fx/bin/ Implab.Fx.Test/bin/ Implab.Fx.Test/obj/ +_ReSharper.Implab/
--- a/Implab.Test/AsyncTests.cs Mon Oct 21 02:34:31 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Fri Nov 08 01:27:04 2013 +0400 @@ -1,101 +1,333 @@ using System; using Microsoft.VisualStudio.TestTools.UnitTesting; -using Implab; using System.Reflection; using System.Threading; +using Implab.Parallels; -namespace Implab.Tests -{ - [TestClass] - public class AsyncTests - { - [TestMethod] - public void ResolveTest () - { - int res = -1; - var p = new Promise<int> (); - p.Then (x => res = x); - p.Resolve (100); +namespace Implab.Test { + [TestClass] + public class AsyncTests { + [TestMethod] + public void ResolveTest() { + int res = -1; + var p = new Promise<int>(); + p.Then(x => res = x); + p.Resolve(100); + + Assert.AreEqual(res, 100); + } + + [TestMethod] + public void RejectTest() { + int res = -1; + Exception err = null; + + var p = new Promise<int>(); + p.Then(x => res = x, e => err = e); + p.Reject(new ApplicationException("error")); + + Assert.AreEqual(res, -1); + Assert.AreEqual(err.Message, "error"); + + } + + [TestMethod] + public void JoinSuccessTest() { + var p = new Promise<int>(); + p.Resolve(100); + Assert.AreEqual(p.Join(), 100); + } - Assert.AreEqual (res, 100); - } + [TestMethod] + public void JoinFailTest() { + var p = new Promise<int>(); + p.Reject(new ApplicationException("failed")); + + try { + p.Join(); + throw new ApplicationException("WRONG!"); + } catch (TargetInvocationException err) { + Assert.AreEqual(err.InnerException.Message, "failed"); + } catch { + Assert.Fail("Got wrong excaption"); + } + } + + [TestMethod] + public void MapTest() { + var p = new Promise<int>(); + + var p2 = p.Map(x => x.ToString()); + p.Resolve(100); + + Assert.AreEqual(p2.Join(), "100"); + } + + [TestMethod] + public void FixErrorTest() { + var p = new Promise<int>(); + + var p2 = p.Error(e => 101); + + p.Reject(new Exception()); + + Assert.AreEqual(p2.Join(), 101); + } [TestMethod] - public void RejectTest () - { - int res = -1; - Exception err = null; + public void ChainTest() { + var p1 = new Promise<int>(); + + var p3 = p1.Chain(x => { + var p2 = new Promise<string>(); + p2.Resolve(x.ToString()); + return p2; + }); + + p1.Resolve(100); - var p = new Promise<int> (); - p.Then (x => res = x, e => err = e); - p.Reject (new ApplicationException ("error")); + Assert.AreEqual(p3.Join(), "100"); + } - Assert.AreEqual (res, -1); - Assert.AreEqual (err.Message, "error"); + [TestMethod] + public void PoolTest() { + var pid = Thread.CurrentThread.ManagedThreadId; + var p = AsyncPool.Invoke(() => Thread.CurrentThread.ManagedThreadId); - } + Assert.AreNotEqual(pid, p.Join()); + } [TestMethod] - public void JoinSuccessTest () - { - var p = new Promise<int> (); - p.Resolve (100); - Assert.AreEqual (p.Join (), 100); - } + public void WorkerPoolSizeTest() { + var pool = new WorkerPool(5, 10, 0); + + Assert.AreEqual(5, pool.ThreadCount); + + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + + Assert.AreEqual(5, pool.ThreadCount); + + for (int i = 0; i < 100; i++) + pool.Invoke(() => { Thread.Sleep(1000000); return 10; }); + Thread.Sleep(100); + Assert.AreEqual(10, pool.ThreadCount); + + pool.Dispose(); + } + + [TestMethod] + public void WorkerPoolCorrectTest() { + var pool = new WorkerPool(0,1000,100); + + int iterations = 1000; + int pending = iterations; + var stop = new ManualResetEvent(false); + + var count = 0; + for (int i = 0; i < iterations; i++) { + pool + .Invoke(() => 1) + .Then(x => Interlocked.Add(ref count, x)) + .Then(x => Math.Log10(x)) + .Anyway(() => { + Interlocked.Decrement(ref pending); + if (pending == 0) + stop.Set(); + }); + } + + stop.WaitOne(); + + Assert.AreEqual(iterations, count); + Console.WriteLine("Max threads: {0}", pool.MaxRunningThreads); + pool.Dispose(); + + } + + [TestMethod] + public void WorkerPoolDisposeTest() { + var pool = new WorkerPool(5, 20); + Assert.AreEqual(5, pool.ThreadCount); + pool.Dispose(); + Thread.Sleep(100); + Assert.AreEqual(0, pool.ThreadCount); + pool.Dispose(); + } [TestMethod] - public void JoinFailTest () - { - var p = new Promise<int> (); - p.Reject (new ApplicationException ("failed")); + public void MTQueueTest() { + var queue = new MTQueue<int>(); + int res; + + queue.Enqueue(10); + Assert.IsTrue(queue.TryDequeue(out res)); + Assert.AreEqual(10, res); + Assert.IsFalse(queue.TryDequeue(out res)); + + for (int i = 0; i < 1000; i++) + queue.Enqueue(i); + + for (int i = 0; i < 1000; i++) { + queue.TryDequeue(out res); + Assert.AreEqual(i, res); + } + + int writers = 0; + int readers = 0; + var stop = new ManualResetEvent(false); + int total = 0; + + int itemsPerWriter = 1000; + int writersCount = 3; - try { - p.Join (); - throw new ApplicationException ("WRONG!"); - } catch (TargetInvocationException err) { - Assert.AreEqual (err.InnerException.Message, "failed"); - } catch { - Assert.Fail ("Got wrong excaption"); - } - } + for (int i = 0; i < writersCount; i++) { + Interlocked.Increment(ref writers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + for (int ii = 0; ii < itemsPerWriter; ii++) { + queue.Enqueue(1); + } + return 1; + }) + .Anyway(() => Interlocked.Decrement(ref writers)); + } + + for (int i = 0; i < 10; i++) { + Interlocked.Increment(ref readers); + var wn = i; + AsyncPool + .InvokeNewThread(() => { + int t; + do { + while (queue.TryDequeue(out t)) + Interlocked.Add(ref total, t); + } while (writers > 0); + return 1; + }) + .Anyway(() => { + Interlocked.Decrement(ref readers); + if (readers == 0) + stop.Set(); + }); + } + + stop.WaitOne(); + + Assert.AreEqual(itemsPerWriter * writersCount, total); + } [TestMethod] - public void MapTest () - { - var p = new Promise<int> (); + public void ParallelMapTest() { + + int count = 100000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); - var p2 = p.Map (x => x.ToString ()); - p.Resolve (100); + var t = Environment.TickCount; + var res = args.ParallelMap(x => Math.Sin(x*x), 4).Join(); + + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); - Assert.AreEqual (p2.Join (), "100"); - } + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } [TestMethod] - public void ChainTest () - { - var p1 = new Promise<int> (); + public void ChainedMapTest() { + + using (var pool = new WorkerPool(8,100,0)) { + int count = 10000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); - var p3 = p1.Chain (x => { - var p2 = new Promise<string> (); - p2.Resolve (x.ToString ()); - return p2; - }); + var t = Environment.TickCount; + var res = args + .ChainedMap( + x => pool.Invoke( + () => Math.Sin(x * x) + ), + 4 + ) + .Join(); - p1.Resolve (100); + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); - Assert.AreEqual (p3.Join (), "100"); - } + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); + } + } [TestMethod] - public void PoolTest () - { - var pid = Thread.CurrentThread.ManagedThreadId; - var p = AsyncPool.Invoke (() => { - return Thread.CurrentThread.ManagedThreadId; - }); + public void ParallelForEachTest() { + + int count = 100000; + + int[] args = new int[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = (int)(rand.NextDouble() * 100); + + int result = 0; + + var t = Environment.TickCount; + args.ParallelForEach(x => Interlocked.Add(ref result, x), 4).Join(); + + Console.WriteLine("Iteration complete in {0} ms, result: {1}", Environment.TickCount - t, result); + + int result2 = 0; + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + result2 += args[i]; + Assert.AreEqual(result2, result); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + } - Assert.AreNotEqual (pid, p.Join ()); - } - } + [TestMethod] + public void ComplexCase1Test() { + var flags = new bool[3]; + + // op1 (aync 200ms) => op2 (async 200ms) => op3 (sync map) + + var p = PromiseHelper + .Sleep(200, "Alan") + .Cancelled(() => flags[0] = true) + .Chain(x => + PromiseHelper + .Sleep(200, "Hi, " + x) + .Map(y => y) + .Cancelled(() => flags[1] = true) + ) + .Cancelled(() => flags[2] = true); + Thread.Sleep(300); + p.Cancel(); + try { + Assert.AreEqual(p.Join(), "Hi, Alan"); + Assert.Fail("Shouldn't get here"); + } catch (OperationCanceledException) { + } + + Assert.IsFalse(flags[0]); + Assert.IsTrue(flags[1]); + Assert.IsTrue(flags[2]); + } + } }
--- a/Implab.Test/Implab.Test.csproj Mon Oct 21 02:34:31 2013 +0400 +++ b/Implab.Test/Implab.Test.csproj Fri Nov 08 01:27:04 2013 +0400 @@ -46,6 +46,7 @@ </ItemGroup> <ItemGroup> <Compile Include="AsyncTests.cs" /> + <Compile Include="PromiseHelper.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> </ItemGroup> <ItemGroup>
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab.Test/PromiseHelper.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,17 @@ +using Implab.Parallels; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Test { + class PromiseHelper { + public static Promise<T> Sleep<T>(int timeout, T retVal) { + return AsyncPool.Invoke(() => { + Thread.Sleep(timeout); + return retVal; + }); + } + } +}
--- a/Implab/AsyncPool.cs Mon Oct 21 02:34:31 2013 +0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,28 +0,0 @@ -using System; -using System.Threading; - -namespace Implab { - /// <summary> - /// Класс для распаралеливания задач. - /// </summary> - /// <remarks> - /// Используя данный класс и лямда выражения можно распараллелить - /// вычисления, для этого используется концепция обещаний. - /// </remarks> - public static class AsyncPool { - - public static Promise<T> Invoke<T>(Func<T> func) { - var p = new Promise<T>(); - - ThreadPool.QueueUserWorkItem(param => { - try { - p.Resolve(func()); - } catch(Exception e) { - p.Reject(e); - } - }); - - return p; - } - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ICancellable.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface ICancellable { + bool Cancel(); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/IProgressHandler.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface IProgressHandler { + string Message { + get; + set; + } + float CurrentProgress { + get; + set; + } + void InitProgress(float current, float max, string message); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/IProgressNotifier.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + public interface IProgressNotifier + { + event EventHandler<ValueEventArgs<string>> MessageUpdated; + event EventHandler<ValueEventArgs<float>> ProgressUpdated; + event EventHandler<ProgressInitEventArgs> ProgressInit; + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/IPromise.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + public interface IPromise: ICancellable + { + /// <summary> + /// Check whereather the promise has no more than one dependent promise. + /// </summary> + bool IsExclusive + { + get; + } + + /// <summary> + /// The current state of the promise. + /// </summary> + PromiseState State + { + get; + } + + /// <summary> + /// Registers handler for the case when the promise is cencelled. If the promise already cancelled the + /// handler will be invoked immediatelly. + /// </summary> + /// <param name="handler">The handler</param> + void HandleCancelled(Action handler); + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ITaskController.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + public interface ITaskController: IProgressHandler { + bool Cancelled { + get; + } + } +}
--- a/Implab/Implab.csproj Mon Oct 21 02:34:31 2013 +0400 +++ b/Implab/Implab.csproj Fri Nov 08 01:27:04 2013 +0400 @@ -32,13 +32,25 @@ <Reference Include="System" /> </ItemGroup> <ItemGroup> + <Compile Include="ICancellable.cs" /> + <Compile Include="IProgressHandler.cs" /> + <Compile Include="IProgressNotifier.cs" /> + <Compile Include="IPromise.cs" /> + <Compile Include="ITaskController.cs" /> + <Compile Include="ManagedPromise.cs" /> + <Compile Include="Parallels\DispatchPool.cs" /> + <Compile Include="Parallels\ArrayTraits.cs" /> + <Compile Include="Parallels\MTQueue.cs" /> + <Compile Include="Parallels\WorkerPool.cs" /> + <Compile Include="PromiseState.cs" /> + <Compile Include="TaskController.cs" /> + <Compile Include="ProgressInitEventArgs.cs" /> <Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Promise.cs" /> - <Compile Include="AsyncPool.cs" /> + <Compile Include="Parallels\AsyncPool.cs" /> <Compile Include="Safe.cs" /> + <Compile Include="ValueEventArgs.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> - <ItemGroup> - <Folder Include="Parallels\" /> - </ItemGroup> + <ItemGroup /> </Project> \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ManagedPromise.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab { + + /*public class ManagedPromise<T>: Promise<T>, ITaskController, IProgressNotifier { + + }*/ +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/ArrayTraits.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,171 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public static class ArrayTraits { + class ArrayIterator<TSrc> : DispatchPool<int> { + readonly Action<TSrc> m_action; + readonly TSrc[] m_source; + readonly Promise<int> m_promise = new Promise<int>(); + + int m_pending; + int m_next; + + public ArrayIterator(TSrc[] source, Action<TSrc> action, int threads) + : base(threads) { + + Debug.Assert(source != null); + Debug.Assert(action != null); + + m_next = 0; + m_source = source; + m_pending = source.Length; + m_action = action; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise<int> Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; + } + + protected override void InvokeUnit(int unit) { + try { + m_action(m_source[unit]); + var pending = Interlocked.Decrement(ref m_pending); + if (pending == 0) + m_promise.Resolve(m_source.Length); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + class ArrayMapper<TSrc, TDst>: DispatchPool<int> { + readonly Func<TSrc, TDst> m_transform; + readonly TSrc[] m_source; + readonly TDst[] m_dest; + readonly Promise<TDst[]> m_promise = new Promise<TDst[]>(); + + int m_pending; + int m_next; + + public ArrayMapper(TSrc[] source, Func<TSrc, TDst> transform, int threads) + : base(threads) { + + Debug.Assert (source != null); + Debug.Assert( transform != null); + + m_next = 0; + m_source = source; + m_dest = new TDst[source.Length]; + m_pending = source.Length; + m_transform = transform; + + m_promise.Anyway(() => Dispose()); + m_promise.Cancelled(() => Dispose()); + + InitPool(); + } + + public Promise<TDst[]> Promise { + get { + return m_promise; + } + } + + protected override bool TryDequeue(out int unit) { + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; + } + + protected override void InvokeUnit(int unit) { + try { + m_dest[unit] = m_transform(m_source[unit]); + var pending = Interlocked.Decrement(ref m_pending); + if (pending == 0) + m_promise.Resolve(m_dest); + } catch (Exception e) { + m_promise.Reject(e); + } + } + } + + public static Promise<TDst[]> ParallelMap<TSrc, TDst> (this TSrc[] source, Func<TSrc,TDst> transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + + var mapper = new ArrayMapper<TSrc, TDst>(source, transform, threads); + return mapper.Promise; + } + + public static Promise<int> ParallelForEach<TSrc>(this TSrc[] source, Action<TSrc> action, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (action == null) + throw new ArgumentNullException("action"); + + var iter = new ArrayIterator<TSrc>(source, action, threads); + return iter.Promise; + } + + public static Promise<TDst[]> ChainedMap<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise<TDst[]>(); + var res = new TDst[source.Length]; + var pending = source.Length; + var semaphore = new Semaphore(threads, threads); + + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < source.Length; i++) { + if(promise.State != PromiseState.Unresolved) + break; // stop processing in case of error or cancellation + var idx = i; + semaphore.WaitOne(); + try { + var p1 = transform(source[i]); + p1.Anyway(() => semaphore.Release()); + p1.Cancelled(() => semaphore.Release()); + p1.Then( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); + + } catch (Exception e) { + promise.Reject(e); + } + } + return 0; + }); + + return promise.Anyway(() => semaphore.Dispose()); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/AsyncPool.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,44 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + /// <summary> + /// Класс для распаралеливания задач. + /// </summary> + /// <remarks> + /// Используя данный класс и лямда выражения можно распараллелить + /// вычисления, для этого используется концепция обещаний. + /// </remarks> + public static class AsyncPool { + + public static Promise<T> Invoke<T>(Func<T> func) { + var p = new Promise<T>(); + + ThreadPool.QueueUserWorkItem(param => { + try { + p.Resolve(func()); + } catch(Exception e) { + p.Reject(e); + } + }); + + return p; + } + + public static Promise<T> InvokeNewThread<T>(Func<T> func) { + var p = new Promise<T>(); + + var worker = new Thread(() => { + try { + p.Resolve(func()); + } catch (Exception e) { + p.Reject(e); + } + }); + worker.IsBackground = true; + worker.Start(); + + return p; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/DispatchPool.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,238 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public abstract class DispatchPool<TUnit> : IDisposable { + readonly int m_minThreads; + readonly int m_maxThreads; + int m_runningThreads = 0; + int m_maxRunningThreads = 0; + int m_suspended = 0; + int m_exitRequired = 0; + AutoResetEvent m_hasTasks = new AutoResetEvent(false); + + protected DispatchPool(int min, int max) { + if (min < 0) + throw new ArgumentOutOfRangeException("min"); + if (max <= 0) + throw new ArgumentOutOfRangeException("max"); + + if (min > max) + min = max; + m_minThreads = min; + m_maxThreads = max; + } + + protected DispatchPool(int threads) + : this(threads, threads) { + } + + protected DispatchPool() { + int maxThreads, maxCP; + ThreadPool.GetMaxThreads(out maxThreads, out maxCP); + + m_minThreads = 0; + m_maxThreads = maxThreads; + } + + protected void InitPool() { + for (int i = 0; i < m_minThreads; i++) + StartWorker(); + } + + public int ThreadCount { + get { + return m_runningThreads; + } + } + + public int MaxRunningThreads { + get { + return m_maxRunningThreads; + } + } + + protected bool IsDisposed { + get { + return m_exitRequired != 0; + } + } + + protected abstract bool TryDequeue(out TUnit unit); + + protected virtual bool ExtendPool() { + if (m_suspended > 0) { + m_hasTasks.Set(); + return true; + } else + return StartWorker(); + } + + /// <summary> + /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока + /// </summary> + protected void WakePool() { + m_hasTasks.Set(); // wake sleeping thread; + + if (AllocateThreadSlot(1)) { + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + } + } + + protected virtual void Suspend() { + m_hasTasks.WaitOne(); + } + + #region thread slots traits + + bool AllocateThreadSlot() { + int current; + // use spins to allocate slot for the new thread + do { + current = m_runningThreads; + if (current >= m_maxThreads || m_exitRequired != 0) + // no more slots left or the pool has been disposed + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current + 1, current)); + + UpdateMaxThreads(current + 1); + + return true; + } + + bool AllocateThreadSlot(int desired) { + if (desired - 1 != Interlocked.CompareExchange(ref m_runningThreads, desired, desired - 1)) + return false; + + UpdateMaxThreads(desired); + + return true; + } + + bool ReleaseThreadSlot(out bool last) { + last = false; + int current; + // use spins to release slot for the new thread + do { + current = m_runningThreads; + if (current <= m_minThreads && m_exitRequired == 0) + // the thread is reserved + return false; + } while (current != Interlocked.CompareExchange(ref m_runningThreads, current - 1, current)); + + last = (current == 1); + + return true; + } + + /// <summary> + /// releases thread slot unconditionally, used during cleanup + /// </summary> + /// <returns>true - no more threads left</returns> + bool ReleaseThreadSlotAnyway() { + var left = Interlocked.Decrement(ref m_runningThreads); + return left == 0; + } + + void UpdateMaxThreads(int count) { + int max; + do { + max = m_maxRunningThreads; + if (max >= count) + break; + } while(max != Interlocked.CompareExchange(ref m_maxRunningThreads, count, max)); + } + + #endregion + + bool StartWorker() { + if (AllocateThreadSlot()) { + // slot successfully allocated + var worker = new Thread(this.Worker); + worker.IsBackground = true; + worker.Start(); + + return true; + } else { + return false; + } + } + + bool FetchTask(out TUnit unit) { + do { + // exit if requested + if (m_exitRequired != 0) { + // release the thread slot + if (ReleaseThreadSlotAnyway()) // it was the last worker + m_hasTasks.Dispose(); + else + m_hasTasks.Set(); // wake next worker + unit = default(TUnit); + return false; + } + + // fetch task + if (TryDequeue(out unit)) { + ExtendPool(); + return true; + } + + //no tasks left, exit if the thread is no longer needed + bool last; + if (ReleaseThreadSlot(out last)) { + if (last && m_hasTasks.WaitOne(0)) { + if (AllocateThreadSlot(1)) + continue; // spin again... + else + // we failed to reallocate slot for this thread + // therefore we need to release the event + m_hasTasks.Set(); + } + + return false; + } + + // entering suspend state + Interlocked.Increment(ref m_suspended); + // keep this thread and wait + Suspend(); + Interlocked.Decrement(ref m_suspended); + } while (true); + } + + protected abstract void InvokeUnit(TUnit unit); + + void Worker() { + TUnit unit; + while (FetchTask(out unit)) + InvokeUnit(unit); + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + if (m_exitRequired == 0) { + if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) + return; + + // wake sleeping threads + m_hasTasks.Set(); + GC.SuppressFinalize(this); + } + } + } + + public void Dispose() { + Dispose(true); + } + + ~DispatchPool() { + Dispose(false); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/MTQueue.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab.Parallels { + public class MTQueue<T> { + class Node { + public Node(T value) { + this.value = value; + } + public readonly T value; + public Node next; + } + + Node m_first; + Node m_last; + + public void Enqueue(T value) { + var last = m_last; + var next = new Node(value); + + while (last != Interlocked.CompareExchange(ref m_last, next, last)) + last = m_last; + + if (last != null) + last.next = next; + else + m_first = next; + } + + public bool TryDequeue(out T value) { + Node first; + Node next = null; + value = default(T); + + do { + first = m_first; + if (first == null) + return false; + next = first.next; + if (next == null) { + // this is the last element, + // then try to update tail + if (first != Interlocked.CompareExchange(ref m_last, null, first)) { + // this is inconsistent situation which means that the queue is empty + if (m_last == null) + return false; + // tail has been changed, that means that we need to restart + continue; + } + + // tail succesfully updated and first.next will never be changed + // other readers will fail due to inconsistency m_last != m_fist, but m_first.next == null + // but the writer may update the m_first since the m_last is null + + // so we need to fix inconsistency by setting m_first to null, but if it already has been + // updated by a writer then we should just give up + Interlocked.CompareExchange(ref m_first, null, first); + break; + + } else { + if (first == Interlocked.CompareExchange(ref m_first, next, first)) + // head succesfully updated + break; + } + } while (true); + + value = first.value; + return true; + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/WorkerPool.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,89 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + public class WorkerPool : DispatchPool<Action> { + + MTQueue<Action> m_queue = new MTQueue<Action>(); + int m_queueLength = 0; + readonly int m_threshold = 1; + + public WorkerPool(int minThreads, int maxThreads, int threshold) + : base(minThreads, maxThreads) { + m_threshold = threshold; + InitPool(); + } + + public WorkerPool(int minThreads, int maxThreads) : + base(minThreads, maxThreads) { + InitPool(); + } + + public WorkerPool(int threads) + : base(threads) { + InitPool(); + } + + public WorkerPool() + : base() { + InitPool(); + } + + public Promise<T> Invoke<T>(Func<T> task) { + if (task == null) + throw new ArgumentNullException("task"); + if (IsDisposed) + throw new ObjectDisposedException(ToString()); + + var promise = new Promise<T>(); + + EnqueueTask(delegate() { + try { + promise.Resolve(task()); + } catch (Exception e) { + promise.Reject(e); + } + }); + + return promise; + } + + protected void EnqueueTask(Action unit) { + Debug.Assert(unit != null); + var len = Interlocked.Increment(ref m_queueLength); + m_queue.Enqueue(unit); + + if(!ExtendPool()) + WakePool(); + } + + protected override bool ExtendPool() { + if (m_queueLength <= m_threshold*ThreadCount) + // in this case we are in active thread and it request for additional workers + // satisfy it only when queue is longer than threshold + return false; + return base.ExtendPool(); + } + + protected override bool TryDequeue(out Action unit) { + if (m_queue.TryDequeue(out unit)) { + Interlocked.Decrement(ref m_queueLength); + return true; + } + return false; + } + + protected override void InvokeUnit(Action unit) { + unit(); + } + + protected override void Suspend() { + if (m_queueLength == 0) + base.Suspend(); + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ProgressInitEventArgs.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + [Serializable] + public class ProgressInitEventArgs: EventArgs + { + public float MaxProgress + { + get; + private set; + } + + public float CurrentProgress + { + get; + private set; + } + + public string Message + { + get; + private set; + } + + public ProgressInitEventArgs(float current, float max, string message) + { + this.MaxProgress = max; + this.CurrentProgress = current; + this.Message = message; + } + } +}
--- a/Implab/Promise.cs Mon Oct 21 02:34:31 2013 +0400 +++ b/Implab/Promise.cs Fri Nov 08 01:27:04 2013 +0400 @@ -1,18 +1,16 @@ using System; using System.Collections.Generic; -using System.Linq; using System.Reflection; -using System.Text; using System.Diagnostics; using System.Threading; namespace Implab { public delegate void ErrorHandler(Exception e); - - public delegate void ResultHandler<T>(T result); - public delegate TNew ResultMapper<TSrc, TNew>(TSrc result); - public delegate Promise<TNew> ChainedOperation<TSrc, TNew>(TSrc result); + public delegate T ErrorHandler<out T>(Exception e); + public delegate void ResultHandler<in T>(T result); + public delegate TNew ResultMapper<in TSrc, out TNew>(TSrc result); + public delegate Promise<TNew> ChainedOperation<in TSrc, TNew>(TSrc result); /// <summary> /// Класс для асинхронного получения результатов. Так называемое "обещание". @@ -48,23 +46,23 @@ /// только инициатор обещания иначе могут возникнуть противоречия. /// </para> /// </remarks> - public class Promise<T> { + public class Promise<T> : IPromise { struct ResultHandlerInfo { public ResultHandler<T> resultHandler; public ErrorHandler errorHandler; } - enum State { - Unresolved, - Resolving, - Resolved, - Cancelled - } + readonly IPromise m_parent; + + LinkedList<ResultHandlerInfo> m_resultHandlers = new LinkedList<ResultHandlerInfo>(); + LinkedList<Action> m_cancelHandlers = new LinkedList<Action>(); - LinkedList<ResultHandlerInfo> m_handlersChain = new LinkedList<ResultHandlerInfo>(); - State m_state; - bool m_cancellable; + readonly object m_lock = new Object(); + readonly bool m_cancellable; + int m_childrenCount = 0; + + PromiseState m_state; T m_result; Exception m_error; @@ -72,13 +70,17 @@ m_cancellable = true; } - /// <summary> - /// Событие, возникающее при отмене асинхронной операции. - /// </summary> - /// <description> - /// Как правило используется для оповещения объекта, выполняющего асинхронную операцию, о том, что ее следует отменить. - /// </description> - public event EventHandler Cancelled; + public Promise(IPromise parent, bool cancellable) { + m_cancellable = cancellable; + m_parent = parent; + if (parent != null) + parent.HandleCancelled(InternalCancel); + } + + void InternalCancel() { + // don't try to cancel parent :) + Cancel(false); + } /// <summary> /// Выполняет обещание, сообщая об успешном выполнении. @@ -86,38 +88,39 @@ /// <param name="result">Результат выполнения.</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> public void Resolve(T result) { - lock (this) { - if (m_state == State.Cancelled) + lock (m_lock) { + if (m_state == PromiseState.Cancelled) return; - if (m_state != State.Unresolved) + if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved"); m_result = result; - m_state = State.Resolving; + m_state = PromiseState.Resolved; } - ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); + OnStateChanged(); } /// <summary> /// Выполняет обещание, сообщая об ошибке /// </summary> + /// <remarks> + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// </remarks> /// <param name="error">Исключение возникшее при выполнении операции</param> /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> public void Reject(Exception error) { - lock (this) { - if (m_state == State.Cancelled) + lock (m_lock) { + if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) return; - if (m_state != State.Unresolved) + if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved"); m_error = error; - m_state = State.Resolving; + m_state = PromiseState.Rejected; } - ResultHandlerInfo handler; - while (FetchNextHandler(out handler)) - InvokeHandler(handler); + OnStateChanged(); } /// <summary> @@ -125,47 +128,31 @@ /// </summary> /// <returns><c>true</c> Операция была отменена, обработчики не будут вызваны.<c>false</c> отмена не возможна, поскольку обещание уже выполнено и обработчики отработали.</returns> public bool Cancel() { - lock (this) { - if (m_state == State.Unresolved && m_cancellable) { - m_state = State.Cancelled; - EventHandler temp = Cancelled; - - if (temp != null) - temp(this, new EventArgs()); - - return true; - } else - return false; - } + return Cancel(true); } /// <summary> - /// Добавляет обработчики событий выполнения обещания. + /// Adds new handlers to this promise. /// </summary> - /// <param name="success">Обработчик успешного выполнения обещания. - /// Данному обработчику будет передан результат выполнения операции.</param> - /// <param name="error">Обработчик ошибки. Данный обработчик получит - /// исключение возникшее при выполнении операции.</param> - /// <returns>Само обещание</returns> + /// <param name="success">The handler of the successfully completed operation. + /// This handler will recieve an operation result as a parameter.</param> + /// <param name="error">Handles an exception that may occur during the operation.</param> + /// <returns>The new promise chained to this one.</returns> public Promise<T> Then(ResultHandler<T> success, ErrorHandler error) { if (success == null && error == null) return this; - var medium = new Promise<T>(); + var medium = new Promise<T>(this, true); var handlerInfo = new ResultHandlerInfo(); if (success != null) handlerInfo.resultHandler = x => { - try { - success(x); - medium.Resolve(x); - } catch (Exception e) { - medium.Reject(e); - } + success(x); + medium.Resolve(x); }; else - handlerInfo.resultHandler = x => medium.Resolve(x); + handlerInfo.resultHandler = medium.Resolve; if (error != null) handlerInfo.errorHandler = x => { @@ -175,21 +162,106 @@ medium.Reject(x); }; else - handlerInfo.errorHandler = x => medium.Reject(x); + handlerInfo.errorHandler = medium.Reject; + + AddHandler(handlerInfo); + + return medium; + } + + /// <summary> + /// Adds new handlers to this promise. + /// </summary> + /// <param name="success">The handler of the successfully completed operation. + /// This handler will recieve an operation result as a parameter.</param> + /// <param name="error">Handles an exception that may occur during the operation and returns the value which will be used as the result of the operation.</param> + /// <returns>The new promise chained to this one.</returns> + public Promise<T> Then(ResultHandler<T> success, ErrorHandler<T> error) { + if (success == null && error == null) + return this; + + var medium = new Promise<T>(this, true); + + var handlerInfo = new ResultHandlerInfo(); + + if (success != null) + handlerInfo.resultHandler = x => { + success(x); + medium.Resolve(x); + }; + else + handlerInfo.resultHandler = medium.Resolve; + + if (error != null) + handlerInfo.errorHandler = x => { + try { + medium.Resolve(error(x)); + } catch { } + medium.Reject(x); + }; + else + handlerInfo.errorHandler = medium.Reject; AddHandler(handlerInfo); return medium; } + public Promise<T> Then(ResultHandler<T> success) { - return Then(success, null); + if (success == null) + return this; + + var medium = new Promise<T>(this, true); + + var handlerInfo = new ResultHandlerInfo(); + + if (success != null) + handlerInfo.resultHandler = x => { + success(x); + medium.Resolve(x); + }; + else + handlerInfo.resultHandler = medium.Resolve; + + handlerInfo.errorHandler = medium.Reject; + + AddHandler(handlerInfo); + + return medium; } public Promise<T> Error(ErrorHandler error) { return Then(null, error); } + /// <summary> + /// Handles error and allows to keep the promise. + /// </summary> + /// <remarks> + /// If the specified handler throws an exception, this exception will be used to reject the promise. + /// </remarks> + /// <param name="handler">The error handler which returns the result of the promise.</param> + /// <returns>New promise.</returns> + public Promise<T> Error(ErrorHandler<T> handler) { + if (handler == null) + return this; + + var medium = new Promise<T>(this, true); + + AddHandler(new ResultHandlerInfo { + errorHandler = e => { + try { + medium.Resolve(handler(e)); + } catch (Exception e2) { + medium.Reject(e2); + } + } + }); + + return medium; + } + public Promise<T> Anyway(Action handler) { if (handler == null) return this; @@ -198,6 +270,7 @@ AddHandler(new ResultHandlerInfo { resultHandler = x => { + // to avoid handler being called multiple times we handle exception by ourselfs try { handler(); medium.Resolve(x); @@ -229,20 +302,15 @@ throw new ArgumentNullException("mapper"); // создаем прицепленное обещание - Promise<TNew> chained = new Promise<TNew>(); + var chained = new Promise<TNew>(); AddHandler(new ResultHandlerInfo() { - resultHandler = delegate(T result) { - try { - // если преобразование выдаст исключение, то сработает reject сцепленного deferred - chained.Resolve(mapper(result)); - } catch (Exception e) { - chained.Reject(e); - } - }, + resultHandler = result => chained.Resolve(mapper(result)), errorHandler = delegate(Exception e) { if (error != null) - error(e); + try { + error(e); + } catch { } // в случае ошибки нужно передать исключение дальше по цепочке chained.Reject(e); } @@ -271,19 +339,21 @@ // создать посредника, к которому будут подвызяваться следующие обработчики. // когда будет выполнена реальная асинхронная операция, она обратиться к посреднику, чтобы // передать через него результаты работы. - Promise<TNew> medium = new Promise<TNew>(); + var medium = new Promise<TNew>(this, true); - AddHandler(new ResultHandlerInfo() { + AddHandler(new ResultHandlerInfo { resultHandler = delegate(T result) { - try { - chained(result).Then( - x => medium.Resolve(x), - e => medium.Reject(e) - ); - } catch (Exception e) { - // если сцепленное действие выдало исключение вместо обещания, то передаем ошибку по цепочке - medium.Reject(e); - } + if (medium.State == PromiseState.Cancelled) + return; + + var promise = chained(result); + + // notify chained operation that it's not needed + medium.Cancelled(() => promise.Cancel()); + promise.Then( + x => medium.Resolve(x), + e => medium.Reject(e) + ); }, errorHandler = delegate(Exception e) { if (error != null) @@ -300,6 +370,22 @@ return Chain(chained, null); } + public Promise<T> Cancelled(Action handler) { + if (handler == null) + return this; + lock (m_lock) { + if (m_state == PromiseState.Unresolved) + m_cancelHandlers.AddLast(handler); + else if (m_state == PromiseState.Cancelled) + handler(); + } + return this; + } + + public void HandleCancelled(Action handler) { + Cancelled(handler); + } + /// <summary> /// Дожидается отложенного обещания и в случае успеха, возвращает /// его, результат, в противном случае бросает исключение. @@ -322,52 +408,37 @@ /// <param name="timeout">Время ожидания</param> /// <returns>Результат выполнения обещания</returns> public T Join(int timeout) { - ManualResetEvent evt = new ManualResetEvent(false); + var evt = new ManualResetEvent(false); Anyway(() => evt.Set()); + Cancelled(() => evt.Set()); if (!evt.WaitOne(timeout, true)) throw new TimeoutException(); - if (m_error != null) - throw new TargetInvocationException(m_error); - else - return m_result; + switch (State) { + case PromiseState.Resolved: + return m_result; + case PromiseState.Cancelled: + throw new OperationCanceledException(); + case PromiseState.Rejected: + throw new TargetInvocationException(m_error); + default: + throw new ApplicationException(String.Format("Invalid promise state {0}", State)); + } } public T Join() { return Join(Timeout.Infinite); } - /// <summary> - /// Данный метод последовательно извлекает обработчики обещания и когда - /// их больше не осталось - ставит состояние "разрешено". - /// </summary> - /// <param name="handler">Информация об обработчике</param> - /// <returns>Признак того, что еще остались обработчики в очереди</returns> - bool FetchNextHandler(out ResultHandlerInfo handler) { - handler = default(ResultHandlerInfo); - - lock (this) { - Debug.Assert(m_state == State.Resolving); - - if (m_handlersChain.Count > 0) { - handler = m_handlersChain.First.Value; - m_handlersChain.RemoveFirst(); - return true; - } else { - m_state = State.Resolved; - return false; - } - } - } - void AddHandler(ResultHandlerInfo handler) { bool invokeRequired = false; - lock (this) { - if (m_state != State.Resolved) - m_handlersChain.AddLast(handler); - else + lock (m_lock) { + m_childrenCount++; + if (m_state == PromiseState.Unresolved) { + m_resultHandlers.AddLast(handler); + } else invokeRequired = true; } @@ -377,21 +448,102 @@ } void InvokeHandler(ResultHandlerInfo handler) { - if (m_error == null) { - try { - if (handler.resultHandler != null) - handler.resultHandler(m_result); - } catch { } - } - - if (m_error != null) { - try { - if (handler.errorHandler != null) - handler.errorHandler(m_error); - } catch { } + switch (m_state) { + case PromiseState.Resolved: + try { + if (handler.resultHandler != null) + handler.resultHandler(m_result); + } catch (Exception e) { + try { + if (handler.errorHandler != null) + handler.errorHandler(e); + } catch { } + } + break; + case PromiseState.Rejected: + try { + if (handler.errorHandler != null) + handler.errorHandler(m_error); + } catch { } + break; + default: + // do nothing + return; } } + protected virtual void OnStateChanged() { + switch (m_state) { + case PromiseState.Resolved: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.resultHandler != null) + resultHandlerInfo.resultHandler(m_result); + } catch (Exception e) { + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(e); + } catch { } + } + break; + case PromiseState.Cancelled: + foreach (var cancelHandler in m_cancelHandlers) + cancelHandler(); + break; + case PromiseState.Rejected: + foreach (var resultHandlerInfo in m_resultHandlers) + try { + if (resultHandlerInfo.errorHandler != null) + resultHandlerInfo.errorHandler(m_error); + } catch { } + break; + default: + throw new InvalidOperationException(String.Format("Promise entered an invalid state {0}", m_state)); + } + + m_resultHandlers = null; + m_cancelHandlers = null; + } + + + + public bool IsExclusive { + get { + lock (m_lock) { + return m_childrenCount <= 1; + } + } + } + + public PromiseState State { + get { + lock (m_lock) { + return m_state; + } + } + } + + protected bool Cancel(bool dependencies) { + bool result; + + lock (m_lock) { + if (m_state == PromiseState.Unresolved) { + m_state = PromiseState.Cancelled; + result = true; + } else { + result = false; + } + } + + if (result) + OnStateChanged(); + + if (dependencies && m_parent != null && m_parent.IsExclusive) { + m_parent.Cancel(); + } + + return result; + } } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/PromiseState.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + public enum PromiseState + { + Unresolved, + Resolved, + Cancelled, + Rejected + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/TaskController.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,132 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; + +namespace Implab +{ + /// <summary> + /// This class allows to interact with asyncronuos task. + /// </summary> + /// <remarks> + /// Members of this object are thread safe. + /// </remarks> + class TaskController: IProgressNotifier, ITaskController, ICancellable + { + readonly object m_lock; + string m_message; + + float m_current; + float m_max; + + bool m_cancelled; + + public event EventHandler<ValueEventArgs<string>> MessageUpdated; + public event EventHandler<ValueEventArgs<float>> ProgressUpdated; + public event EventHandler<ProgressInitEventArgs> ProgressInit; + + public TaskController() + { + m_lock = new Object(); + } + + public string Message + { + get + { + lock (m_lock) + return m_message; + } + set + { + lock (m_lock) + { + m_message = value; + OnMessageUpdated(); + } + } + } + + public float CurrentProgress + { + get + { + lock (m_lock) + return m_current; + } + set + { + lock (m_lock) + { + var prev = m_current; + m_current = value; + if (m_current >= m_max) + m_current = m_max; + if (m_current != prev) + OnProgressUpdated(); + } + } + } + + public void InitProgress(float current, float max, string message) + { + if (max < 0) + throw new ArgumentOutOfRangeException("max"); + if (current < 0 || current > max) + throw new ArgumentOutOfRangeException("current"); + + lock(m_lock) { + m_current = current; + m_max = max; + m_message = message; + OnProgressInit(); + } + } + + public bool Cancelled { + get { + lock (m_lock) + return m_cancelled; + } + } + + public bool Cancel() { + lock (m_lock) { + if (!m_cancelled) { + m_cancelled = true; + return true; + } else { + return false; + } + } + } + + protected virtual void OnMessageUpdated() + { + var temp = MessageUpdated; + if (temp != null) + { + temp(this, new ValueEventArgs<string>(m_message)); + } + } + + protected virtual void OnProgressUpdated() + { + var temp = ProgressUpdated; + if (temp != null) + { + temp(this,new ValueEventArgs<float>(m_current)); + } + } + + protected virtual void OnProgressInit() + { + var temp = ProgressInit; + if (temp != null) + { + temp(this, new ProgressInitEventArgs(m_current,m_max, m_message)); + } + } + } +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/ValueEventArgs.cs Fri Nov 08 01:27:04 2013 +0400 @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Implab +{ + [Serializable] + public class ValueEventArgs<T>: EventArgs + { + public ValueEventArgs(T value) + { + this.Value = value; + } + public T Value + { + get; + private set; + } + } +}