Mercurial > pub > ImplabNet
changeset 80:4f20870d0816 v2
added memory barriers
author | cin |
---|---|
date | Fri, 26 Sep 2014 03:32:34 +0400 |
parents | 05e6468f066f |
children | 2c5631b43c7d |
files | Implab.Test/AsyncTests.cs Implab.mono.sln Implab/Implab.csproj Implab/JSON/JSONXmlReaderOptions.cs Implab/Parallels/ArrayTraits.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/MTQueue.cs Implab/Parallels/WorkerPool.cs Implab/Promise.cs Implab/SyncPool.cs Implab/SyncPoolWrapper.cs |
diffstat | 11 files changed, 205 insertions(+), 63 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab.Test/AsyncTests.cs Fri Sep 26 03:32:34 2014 +0400 @@ -291,7 +291,7 @@ [TestMethod] public void ChainedMapTest() { - using (var pool = new WorkerPool(0,100,100)) { + using (var pool = new WorkerPool(0,10,100)) { const int count = 10000; var args = new double[count];
--- a/Implab.mono.sln Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab.mono.sln Fri Sep 26 03:32:34 2014 +0400 @@ -16,8 +16,6 @@ EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test.mono", "Implab.Test\Implab.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx.Test.mono", "Implab.Fx.Test\Implab.Fx.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}" -EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -77,7 +75,6 @@ EndGlobalSection GlobalSection(NestedProjects) = preSolution {2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452} - {2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452} EndGlobalSection GlobalSection(MonoDevelopProperties) = preSolution StartupItem = Implab\Implab.csproj
--- a/Implab/Implab.csproj Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Implab.csproj Fri Sep 26 03:32:34 2014 +0400 @@ -7,6 +7,8 @@ <OutputType>Library</OutputType> <RootNamespace>Implab</RootNamespace> <AssemblyName>Implab</AssemblyName> + <ProductVersion>8.0.30703</ProductVersion> + <SchemaVersion>2.0</SchemaVersion> </PropertyGroup> <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> <DebugSymbols>true</DebugSymbols> @@ -121,6 +123,8 @@ <Compile Include="PromiseExtensions.cs" /> <Compile Include="TransientPromiseException.cs" /> <Compile Include="SyncContextPromise.cs" /> + <Compile Include="SyncPool.cs" /> + <Compile Include="SyncPoolWrapper.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup />
--- a/Implab/JSON/JSONXmlReaderOptions.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/JSON/JSONXmlReaderOptions.cs Fri Sep 26 03:32:34 2014 +0400 @@ -19,7 +19,9 @@ } /// <summary> - /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности) + /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности), иначе массив + /// представляется в виде узла, дочерними элементами которого являются элементы массива, имена дочерних элементов + /// определяются свойством <see cref="ArrayItemName"/>. По умолчанию <c>false</c>. /// </summary> public bool FlattenArrays { get; @@ -44,6 +46,7 @@ /// <summary> /// Имя элемента для массивов, если не включена опция <see cref="FlattenArrays"/>. + /// По умолчанию <c>item</c>. /// </summary> public string ArrayItemName { get;
--- a/Implab/Parallels/ArrayTraits.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Parallels/ArrayTraits.cs Fri Sep 26 03:32:34 2014 +0400 @@ -153,7 +153,8 @@ var res = new TDst[source.Length]; var pending = source.Length; - var semaphore = new Semaphore(threads, threads); + object locker = new object(); + int slots = threads; // Analysis disable AccessToDisposedClosure AsyncPool.InvokeNewThread(() => { @@ -162,22 +163,28 @@ break; // stop processing in case of error or cancellation var idx = i; - semaphore.WaitOne(); + lock(locker) { + while(slots == 0) + Monitor.Wait(locker); + slots--; + } try { - var p1 = transform(source[i]); - p1.Anyway(() => semaphore.Release()); - p1.Then( - x => { - res[idx] = x; - var left = Interlocked.Decrement(ref pending); - if (left == 0) - promise.Resolve(res); - }, - e => { - promise.Reject(e); - throw new TransientPromiseException(e); - } - ); + transform(source[i]) + .Anyway(() => { + lock(locker) { + slots ++; + Monitor.Pulse(locker); + } + }) + .Last( + x => { + res[idx] = x; + var left = Interlocked.Decrement(ref pending); + if (left == 0) + promise.Resolve(res); + }, + e => promise.Reject(e) + ); } catch (Exception e) { promise.Reject(e); @@ -186,7 +193,7 @@ return 0; }); - return promise.Anyway(semaphore.Dispose); + return promise; } }
--- a/Implab/Parallels/DispatchPool.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Parallels/DispatchPool.cs Fri Sep 26 03:32:34 2014 +0400 @@ -9,16 +9,17 @@ public abstract class DispatchPool<TUnit> : IDisposable { readonly int m_minThreads; readonly int m_maxThreads; + readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit int m_createdThreads = 0; // the current size of the pool int m_activeThreads = 0; // the count of threads which are active int m_sleepingThreads = 0; // the count of currently inactive threads int m_maxRunningThreads = 0; // the meximum reached size of the pool int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released - int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit + int m_wakeEvents = 0; // the count of wake events - AutoResetEvent m_hasTasks = new AutoResetEvent(false); + readonly object m_signalLocker = new object(); protected DispatchPool(int min, int max) { if (min < 0) @@ -51,68 +52,76 @@ public int PoolSize { get { + Thread.MemoryBarrier(); return m_createdThreads; } } public int ActiveThreads { get { + Thread.MemoryBarrier(); return m_activeThreads; } } public int MaxRunningThreads { get { + Thread.MemoryBarrier(); return m_maxRunningThreads; } } protected bool IsDisposed { get { - return m_exitRequired != 0; + Thread.MemoryBarrier(); + return m_exitRequired == 1; } } protected abstract bool TryDequeue(out TUnit unit); - #region thread execution traits + #region thread signaling traits int SignalThread() { var signals = Interlocked.Increment(ref m_wakeEvents); if(signals == 1) - m_hasTasks.Set(); + lock(m_signalLocker) + Monitor.Pulse(m_signalLocker); return signals; } bool FetchSignalOrWait(int timeout) { var start = Environment.TickCount; - - // означает, что поток владеет блокировкой и при успешном получении сигнала должен - // ее вернуть, чтобы другой ожидающий поток смог - bool hasLock = false; + int signals; + Thread.MemoryBarrier(); // m_wakeEvents volatile first read do { - int signals; - do { - signals = m_wakeEvents; - if (signals == 0) - break; - } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); + signals = m_wakeEvents; + if (signals == 0) + break; + } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals); - if (signals >= 1) { - if (signals > 1 && hasLock) - m_hasTasks.Set(); + if (signals == 0) { + // no signal is fetched + lock(m_signalLocker) { + while(m_wakeEvents == 0) { + if (timeout != -1) + timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + if(!Monitor.Wait(m_signalLocker,timeout)) + return false; // timeout + } + // m_wakeEvents > 0 + if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized + Monitor.Pulse(m_signalLocker); + + // signal fetched return true; } - if (timeout != -1) - timeout = Math.Max(0, timeout - (Environment.TickCount - start)); + } else { + // signal fetched + return true; + } - // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие - // и уйдет на пустой цикл, после чего заблокируется - hasLock = true; - } while (m_hasTasks.WaitOne(timeout)); - - return false; } bool Sleep(int timeout) { @@ -131,7 +140,8 @@ /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока /// </summary> protected void GrowPool() { - if (m_exitRequired != 0) + Thread.MemoryBarrier(); + if (m_exitRequired == 1) return; if (m_sleepingThreads > m_wakeEvents) { //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents); @@ -204,7 +214,7 @@ // use spins to allocate slot for the new thread do { current = m_createdThreads; - if (current >= m_maxThreads || m_exitRequired != 0) + if (current >= m_maxThreads || m_exitRequired == 1) // no more slots left or the pool has been disposed return false; } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current)); @@ -227,6 +237,7 @@ last = false; int current; // use spins to release slot for the new thread + Thread.MemoryBarrier(); do { current = m_createdThreads; if (current <= m_minThreads && m_exitRequired == 0) @@ -264,6 +275,7 @@ // slot successfully allocated var worker = new Thread(this.Worker); worker.IsBackground = true; + Interlocked.Increment(ref m_activeThreads); worker.Start(); return true; @@ -277,15 +289,14 @@ protected virtual void Worker() { TUnit unit; //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId); - Interlocked.Increment(ref m_activeThreads); + int count = 0;; + Thread.MemoryBarrier(); do { // exit if requested - if (m_exitRequired != 0) { + if (m_exitRequired == 1) { // release the thread slot Interlocked.Decrement(ref m_activeThreads); - if (ReleaseThreadSlotAnyway()) // it was the last worker - m_hasTasks.Dispose(); - else + if (!ReleaseThreadSlotAnyway()) // it was the last worker SignalThread(); // wake next worker break; } @@ -293,14 +304,17 @@ // fetch task if (TryDequeue(out unit)) { InvokeUnit(unit); + count ++; continue; } Interlocked.Decrement(ref m_activeThreads); + Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count); // entering suspend state // keep this thread and wait if (!Suspend()) break; + count = 0; //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId); Interlocked.Increment(ref m_activeThreads); } while (true); @@ -309,15 +323,10 @@ protected virtual void Dispose(bool disposing) { if (disposing) { - if (m_exitRequired == 0) { - if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0) - return; - + if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier // wake sleeping threads if (m_createdThreads > 0) SignalThread(); - else - m_hasTasks.Dispose(); GC.SuppressFinalize(this); } }
--- a/Implab/Parallels/MTQueue.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Parallels/MTQueue.cs Fri Sep 26 03:32:34 2014 +0400 @@ -18,6 +18,8 @@ Node m_last; public void Enqueue(T value) { + Thread.MemoryBarrier(); + var last = m_last; var next = new Node(value); @@ -35,6 +37,7 @@ Node next = null; value = default(T); + Thread.MemoryBarrier(); do { first = m_first; if (first == null)
--- a/Implab/Parallels/WorkerPool.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Parallels/WorkerPool.cs Fri Sep 26 03:32:34 2014 +0400 @@ -12,20 +12,24 @@ MTQueue<Action> m_queue = new MTQueue<Action>(); int m_queueLength = 0; readonly int m_threshold = 1; + int m_workers = 0; public WorkerPool(int minThreads, int maxThreads, int threshold) : base(minThreads, maxThreads) { m_threshold = threshold; + m_workers = minThreads; InitPool(); } public WorkerPool(int minThreads, int maxThreads) : base(minThreads, maxThreads) { + m_workers = minThreads; InitPool(); } public WorkerPool(int threads) : base(threads) { + m_workers = threads; InitPool(); } @@ -62,8 +66,10 @@ var len = Interlocked.Increment(ref m_queueLength); m_queue.Enqueue(unit); - if (len > m_threshold*ActiveThreads) + if (len > m_threshold * m_workers) { + Interlocked.Increment(ref m_workers); GrowPool(); + } } protected override bool TryDequeue(out Action unit) { @@ -85,8 +91,10 @@ // Suspend // queueLength > 0 // continue + Thread.MemoryBarrier(); if (m_queueLength > 0) return true; + Interlocked.Decrement(ref m_workers); return base.Suspend(); }
--- a/Implab/Promise.cs Mon Sep 22 18:20:49 2014 +0400 +++ b/Implab/Promise.cs Fri Sep 26 03:32:34 2014 +0400 @@ -142,18 +142,20 @@ void WaitTransition() { while (m_state == TRANSITIONAL_STATE) { - /* noop */ + Thread.MemoryBarrier(); } } public bool IsResolved { get { + Thread.MemoryBarrier(); return m_state > 1; } } public bool IsCancelled { get { + Thread.MemoryBarrier(); return m_state == CANCELLED_STATE; } }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/SyncPool.cs Fri Sep 26 03:32:34 2014 +0400 @@ -0,0 +1,85 @@ +using System; +using Implab.Parallels; +using System.Threading; + +namespace Implab { + /*public class SyncPool<T> : IDisposable { + readonly Func<T> m_factory; + readonly Action<T> m_cleanup; + readonly int m_size; + readonly MTQueue<T> m_queue = new MTQueue<T>(); + + volatile bool m_disposed; + + volatile int m_count; + + public SyncPool(Func<T> factory, Action<T> cleanup, int size) { + Safe.ArgumentNotNull(factory, "factory"); + Safe.ArgumentInRange(size, 1, size, "size"); + + m_factory = factory; + m_cleanup = cleanup; + m_size = size; + } + + public SyncPool(Func<T> factory, Action<T> cleanup) : this(factory,cleanup,Environment.ProcessorCount+1) { + } + + public SyncPool(Func<T> factory) : this(factory,null,Environment.ProcessorCount+1) { + } + + public SyncPoolWrapper<T> Allocate() { + if (m_disposed) + throw new ObjectDisposedException(this.ToString()); + + T instance; + if (m_queue.TryDequeue(out instance)) { + Interlocked.Decrement(ref m_count); + return instance; + } else { + instance = m_factory(); + } + return new SyncPoolWrapper<T>(instance, this); + } + + public void Release(T instance) { + if (m_count < m_size && !m_disposed) { + Interlocked.Increment(ref m_count); + + if (m_cleanup != null) + m_cleanup(instance); + + m_queue.Enqueue(instance); + + // пока элемент возвращался в кеш, была начата операция освобождения всего кеша + // и возможно уже законцена, в таком случае следует извлечь элемент обратно и + // освободить его. Если операция освобождения кеша еще не заврешилась, то будет + // изъят и освобожден произвольный элемен, что не повлияет на ход всего процесса. + if (m_disposed && m_queue.TryDequeue(out instance)) + Safe.Dispose(instance); + + } else { + Safe.Dispose(instance); + } + } + + protected virtual void Dispose(bool disposing) { + if (disposing) { + m_disposed = true; + T instance; + while (m_queue.TryDequeue(out instance)) + Safe.Dispose(instance); + } + } + + #region IDisposable implementation + + public void Dispose() { + Dispose(true); + GC.SuppressFinalize(this); + } + + #endregion + }*/ +} +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/SyncPoolWrapper.cs Fri Sep 26 03:32:34 2014 +0400 @@ -0,0 +1,24 @@ +using System; + +namespace Implab { + /*public struct SyncPoolWrapper<T> : IDisposable { + readonly T m_value; + readonly SyncPool<T> m_pool; + + internal SyncPoolWrapper(T value, SyncPool<T> pool) { + m_value = value; + m_pool = pool; + } + + public T Value { + get { return m_value; } + } + + #region IDisposable implementation + public void Dispose() { + m_pool.Release(m_value); + } + #endregion + }*/ +} +