# HG changeset patch # User cin # Date 1383841226 -14400 # Node ID 5a4b735ba66981d684eb477f0f5e697d2200bf82 # Parent 0f982f9b7d4d0bf83e1700783d6b0eb8fe01aec5 sync diff -r 0f982f9b7d4d -r 5a4b735ba669 Implab.Test/AsyncTests.cs --- a/Implab.Test/AsyncTests.cs Thu Nov 07 03:41:32 2013 +0400 +++ b/Implab.Test/AsyncTests.cs Thu Nov 07 20:20:26 2013 +0400 @@ -242,6 +242,38 @@ } [TestMethod] + public void ChainedMapTest() { + + using (var pool = new WorkerPool(1,10)) { + int count = 10000; + + double[] args = new double[count]; + var rand = new Random(); + + for (int i = 0; i < count; i++) + args[i] = rand.NextDouble(); + + var t = Environment.TickCount; + var res = args + .ChainedMap( + x => pool.Invoke( + () => Math.Sin(x * x) + ), + 4 + ) + .Join(); + + Console.WriteLine("Map complete in {0} ms", Environment.TickCount - t); + + t = Environment.TickCount; + for (int i = 0; i < count; i++) + Assert.AreEqual(Math.Sin(args[i] * args[i]), res[i]); + Console.WriteLine("Verified in {0} ms", Environment.TickCount - t); + Console.WriteLine("Max workers: {0}", pool.MaxRunningThreads); + } + } + + [TestMethod] public void ParallelForEachTest() { int count = 100000; diff -r 0f982f9b7d4d -r 5a4b735ba669 Implab.suo Binary file Implab.suo has changed diff -r 0f982f9b7d4d -r 5a4b735ba669 Implab/Parallels/ArrayTraits.cs --- a/Implab/Parallels/ArrayTraits.cs Thu Nov 07 03:41:32 2013 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Thu Nov 07 20:20:26 2013 +0400 @@ -39,26 +39,14 @@ } protected override bool TryDequeue(out int unit) { - int index; - unit = -1; - do { - index = m_next; - if (index >= m_source.Length) - return false; - } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); - - unit = index; - return true; + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; } protected override void InvokeUnit(int unit) { try { m_action(m_source[unit]); - int pending; - do { - pending = m_pending; - } while (pending != Interlocked.CompareExchange(ref m_pending, pending - 1, pending)); - pending--; + var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_source.Length); } catch (Exception e) { @@ -101,26 +89,14 @@ } protected override bool TryDequeue(out int unit) { - int index; - unit = -1; - do { - index = m_next; - if (index >= m_source.Length) - return false; - } while (index != Interlocked.CompareExchange(ref m_next, index + 1, index)); - - unit = index; - return true; + unit = Interlocked.Increment(ref m_next) - 1; + return unit >= m_source.Length ? false : true; } protected override void InvokeUnit(int unit) { try { m_dest[unit] = m_transform(m_source[unit]); - int pending; - do { - pending = m_pending; - } while ( pending != Interlocked.CompareExchange(ref m_pending,pending -1, pending)); - pending --; + var pending = Interlocked.Decrement(ref m_pending); if (pending == 0) m_promise.Resolve(m_dest); } catch (Exception e) { @@ -148,5 +124,48 @@ var iter = new ArrayIterator(source, action, threads); return iter.Promise; } + + public static Promise ChainedMap(this TSrc[] source, ChainedOperation transform, int threads) { + if (source == null) + throw new ArgumentNullException("source"); + if (transform == null) + throw new ArgumentNullException("transform"); + if (threads <= 0) + throw new ArgumentOutOfRangeException("Threads number must be greater then zero"); + + var promise = new Promise(); + var res = new TDst[source.Length]; + var pending = source.Length; + var semaphore = new Semaphore(threads, threads); + + AsyncPool.InvokeNewThread(() => { + for (int i = 0; i < source.Length; i++) { + if(promise.State != PromiseState.Unresolved) + break; // stop processing in case of error or cancellation + var idx = i; + semaphore.WaitOne(); + try { + var p1 = transform(source[i]); + p1.Anyway(() => semaphore.Release()); + p1.Cancelled(() => semaphore.Release()); + p1.Then( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); + + } catch (Exception e) { + promise.Reject(e); + } + } + return 0; + }); + + return promise.Anyway(() => semaphore.Dispose()); + } } } diff -r 0f982f9b7d4d -r 5a4b735ba669 Implab/Parallels/AsyncPool.cs --- a/Implab/Parallels/AsyncPool.cs Thu Nov 07 03:41:32 2013 +0400 +++ b/Implab/Parallels/AsyncPool.cs Thu Nov 07 20:20:26 2013 +0400 @@ -36,7 +36,6 @@ } }); worker.IsBackground = true; - worker.Start(); return p; diff -r 0f982f9b7d4d -r 5a4b735ba669 Implab/Parallels/DispatchPool.cs --- a/Implab/Parallels/DispatchPool.cs Thu Nov 07 03:41:32 2013 +0400 +++ b/Implab/Parallels/DispatchPool.cs Thu Nov 07 20:20:26 2013 +0400 @@ -63,9 +63,10 @@ } bool StartWorker() { - var current = m_runningThreads; + int current; // use spins to allocate slot for the new thread do { + current = m_runningThreads; if (current >= m_maxThreads || m_exitRequired != 0) // no more slots left or the pool has been disposed return false; @@ -84,24 +85,33 @@ protected abstract bool TryDequeue(out TUnit unit); - protected virtual void WakeNewWorker() { + protected virtual void WakeNewWorker(bool extend) { if (m_suspended > 0) m_hasTasks.Set(); else StartWorker(); } + /// + /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока + /// + protected void StartIfIdle() { + int threads; + do { + + } + } + + protected virtual void Suspend() { + m_hasTasks.WaitOne(); + } + bool FetchTask(out TUnit unit) { do { // exit if requested if (m_exitRequired != 0) { // release the thread slot - int running; - do { - running = m_runningThreads; - } while (running != Interlocked.CompareExchange(ref m_runningThreads, running - 1, running)); - running--; - + var running = Interlocked.Decrement(ref m_runningThreads); if (running == 0) // it was the last worker m_hasTasks.Dispose(); else @@ -112,7 +122,7 @@ // fetch task if (TryDequeue(out unit)) { - WakeNewWorker(); + WakeNewWorker(true); return true; } @@ -122,19 +132,21 @@ do { runningThreads = m_runningThreads; if (runningThreads <= m_minThreads) { + // check wheather this is the last thread and we have tasks + exit = false; break; } } while (runningThreads != Interlocked.CompareExchange(ref m_runningThreads, runningThreads - 1, runningThreads)); if (exit) { - Interlocked.Decrement(ref m_runningThreads); return false; } - // keep this thread and wait + // entering suspend state Interlocked.Increment(ref m_suspended); - m_hasTasks.WaitOne(); + // keep this thread and wait + Suspend(); Interlocked.Decrement(ref m_suspended); } while (true); } diff -r 0f982f9b7d4d -r 5a4b735ba669 Implab/Parallels/WorkerPool.cs --- a/Implab/Parallels/WorkerPool.cs Thu Nov 07 03:41:32 2013 +0400 +++ b/Implab/Parallels/WorkerPool.cs Thu Nov 07 20:20:26 2013 +0400 @@ -10,20 +10,27 @@ MTQueue m_queue = new MTQueue(); int m_queueLength = 0; + readonly int m_threshold = 1; - public WorkerPool(int minThreads, int maxThreads) + public WorkerPool(int minThreads, int maxThreads, int threshold) : base(minThreads, maxThreads) { - InitPool(); + m_threshold = threshold; + InitPool(); + } + + public WorkerPool(int minThreads, int maxThreads) : + base(minThreads, maxThreads) { + InitPool(); } public WorkerPool(int threads) : base(threads) { - InitPool(); + InitPool(); } public WorkerPool() : base() { - InitPool(); + InitPool(); } public Promise Invoke(Func task) { @@ -47,11 +54,20 @@ protected void EnqueueTask(Action unit) { Debug.Assert(unit != null); - Interlocked.Increment(ref m_queueLength); + var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - // if there are sleeping threads in the pool wake one - // probably this will lead a dry run - WakeNewWorker(); + + if (ThreadCount == 0) + // force to start + WakeNewWorker(false); + } + + protected override void WakeNewWorker(bool extend) { + if (extend && m_queueLength <= m_threshold) + // in this case we are in active thread and it request for additional workers + // satisfy it only when queue is longer than threshold + return; + base.WakeNewWorker(extend); } protected override bool TryDequeue(out Action unit) { @@ -65,5 +81,10 @@ protected override void InvokeUnit(Action unit) { unit(); } + + protected override void Suspend() { + if (m_queueLength == 0) + base.Suspend(); + } } } diff -r 0f982f9b7d4d -r 5a4b735ba669 Implab/Promise.cs --- a/Implab/Promise.cs Thu Nov 07 03:41:32 2013 +0400 +++ b/Implab/Promise.cs Thu Nov 07 20:20:26 2013 +0400 @@ -103,11 +103,16 @@ /// /// Выполняет обещание, сообщая об ошибке /// + /// + /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков + /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные + /// будут проигнорированы. + /// /// Исключение возникшее при выполнении операции /// Данное обещание уже выполнено public void Reject(Exception error) { lock (m_lock) { - if (m_state == PromiseState.Cancelled) + if (m_state == PromiseState.Cancelled || m_state == PromiseState.Rejected) return; if (m_state != PromiseState.Unresolved) throw new InvalidOperationException("The promise is already resolved");