changeset 80:4f20870d0816 v2

added memory barriers
author cin
date Fri, 26 Sep 2014 03:32:34 +0400 (2014-09-25)
parents 05e6468f066f
children 2c5631b43c7d
files Implab.Test/AsyncTests.cs Implab.mono.sln Implab/Implab.csproj Implab/JSON/JSONXmlReaderOptions.cs Implab/Parallels/ArrayTraits.cs Implab/Parallels/DispatchPool.cs Implab/Parallels/MTQueue.cs Implab/Parallels/WorkerPool.cs Implab/Promise.cs Implab/SyncPool.cs Implab/SyncPoolWrapper.cs
diffstat 11 files changed, 205 insertions(+), 63 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab.Test/AsyncTests.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -291,7 +291,7 @@
         [TestMethod]
         public void ChainedMapTest() {
 
-            using (var pool = new WorkerPool(0,100,100)) {
+            using (var pool = new WorkerPool(0,10,100)) {
                 const int count = 10000;
 
                 var args = new double[count];
--- a/Implab.mono.sln	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab.mono.sln	Fri Sep 26 03:32:34 2014 +0400
@@ -16,8 +16,6 @@
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Test.mono", "Implab.Test\Implab.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}"
 EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Implab.Fx.Test.mono", "Implab.Fx.Test\Implab.Fx.Test.mono.csproj", "{2BD05F84-E067-4B87-9477-FDC2676A21C6}"
-EndProject
 Global
 	GlobalSection(SolutionConfigurationPlatforms) = preSolution
 		Debug|Any CPU = Debug|Any CPU
@@ -77,7 +75,6 @@
 	EndGlobalSection
 	GlobalSection(NestedProjects) = preSolution
 		{2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452}
-		{2BD05F84-E067-4B87-9477-FDC2676A21C6} = {BCA337C3-BFDC-4825-BBDB-E6D467E4E452}
 	EndGlobalSection
 	GlobalSection(MonoDevelopProperties) = preSolution
 		StartupItem = Implab\Implab.csproj
--- a/Implab/Implab.csproj	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Implab.csproj	Fri Sep 26 03:32:34 2014 +0400
@@ -7,6 +7,8 @@
     <OutputType>Library</OutputType>
     <RootNamespace>Implab</RootNamespace>
     <AssemblyName>Implab</AssemblyName>
+    <ProductVersion>8.0.30703</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
   </PropertyGroup>
   <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
     <DebugSymbols>true</DebugSymbols>
@@ -121,6 +123,8 @@
     <Compile Include="PromiseExtensions.cs" />
     <Compile Include="TransientPromiseException.cs" />
     <Compile Include="SyncContextPromise.cs" />
+    <Compile Include="SyncPool.cs" />
+    <Compile Include="SyncPoolWrapper.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
   <ItemGroup />
--- a/Implab/JSON/JSONXmlReaderOptions.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/JSON/JSONXmlReaderOptions.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -19,7 +19,9 @@
         }
 
         /// <summary>
-        /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности)
+        /// Интерпретировать массивы как множественные элементы (убирает один уровень вложенности), иначе массив
+        /// представляется в виде узла, дочерними элементами которого являются элементы массива, имена дочерних элементов
+        /// определяются свойством <see cref="ArrayItemName"/>. По умолчанию <c>false</c>.
         /// </summary>
         public bool FlattenArrays {
             get;
@@ -44,6 +46,7 @@
 
         /// <summary>
         /// Имя элемента для массивов, если не включена опция <see cref="FlattenArrays"/>.
+        /// По умолчанию <c>item</c>.
         /// </summary>
         public string ArrayItemName {
             get;
--- a/Implab/Parallels/ArrayTraits.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -153,7 +153,8 @@
             var res = new TDst[source.Length];
             var pending = source.Length;
 
-            var semaphore = new Semaphore(threads, threads);
+            object locker = new object();
+            int slots = threads;
 
             // Analysis disable AccessToDisposedClosure
             AsyncPool.InvokeNewThread(() => {
@@ -162,22 +163,28 @@
                         break; // stop processing in case of error or cancellation
                     var idx = i;
 
-                    semaphore.WaitOne();
+                    lock(locker) {
+                        while(slots == 0)
+                            Monitor.Wait(locker);
+                        slots--;
+                    }
                     try {
-                        var p1 = transform(source[i]);
-                        p1.Anyway(() => semaphore.Release());
-                        p1.Then(
-                            x => {
-                                res[idx] = x;
-                                var left = Interlocked.Decrement(ref pending);
-                                if (left == 0)
-                                    promise.Resolve(res);
-                            },
-                            e => {
-                                promise.Reject(e);
-                                throw new TransientPromiseException(e);
-                            }
-                        );
+                        transform(source[i])
+                            .Anyway(() => {
+                                lock(locker) {
+                                    slots ++;
+                                    Monitor.Pulse(locker);
+                                }
+                            })
+                            .Last(
+                                x => {
+                                    res[idx] = x;
+                                    var left = Interlocked.Decrement(ref pending);
+                                    if (left == 0)
+                                        promise.Resolve(res);
+                                },
+                                e => promise.Reject(e)
+                            );
 
                     } catch (Exception e) {
                         promise.Reject(e);
@@ -186,7 +193,7 @@
                 return 0;
             });
 
-            return promise.Anyway(semaphore.Dispose);
+            return promise;
         }
 
     }
--- a/Implab/Parallels/DispatchPool.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Parallels/DispatchPool.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -9,16 +9,17 @@
     public abstract class DispatchPool<TUnit> : IDisposable {
         readonly int m_minThreads;
         readonly int m_maxThreads;
+        readonly int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
 
         int m_createdThreads = 0; // the current size of the pool
         int m_activeThreads = 0; // the count of threads which are active
         int m_sleepingThreads = 0; // the count of currently inactive threads
         int m_maxRunningThreads = 0; // the meximum reached size of the pool
         int m_exitRequired = 0; // the pool is going to shutdown, all unused workers are released
-        int m_releaseTimeout = 100; // the timeout while the working thread will wait for the new tasks before exit
+
         int m_wakeEvents = 0; // the count of wake events
         
-        AutoResetEvent m_hasTasks = new AutoResetEvent(false);
+        readonly object m_signalLocker = new object();
 
         protected DispatchPool(int min, int max) {
             if (min < 0)
@@ -51,68 +52,76 @@
 
         public int PoolSize {
             get {
+                Thread.MemoryBarrier();
                 return m_createdThreads;
             }
         }
 
         public int ActiveThreads {
             get {
+                Thread.MemoryBarrier();
                 return m_activeThreads;
             }
         }
 
         public int MaxRunningThreads {
             get {
+                Thread.MemoryBarrier();
                 return m_maxRunningThreads;
             }
         }
 
         protected bool IsDisposed {
             get {
-                return m_exitRequired != 0;
+                Thread.MemoryBarrier();
+                return m_exitRequired == 1;
             }
         }
 
         protected abstract bool TryDequeue(out TUnit unit);
 
-        #region thread execution traits
+        #region thread signaling traits
         int SignalThread() {
             var signals = Interlocked.Increment(ref m_wakeEvents);
             if(signals == 1)
-                m_hasTasks.Set();
+                lock(m_signalLocker)
+                    Monitor.Pulse(m_signalLocker);
             return signals;
         }
 
         bool FetchSignalOrWait(int timeout) {
             var start = Environment.TickCount;
-
-            // означает, что поток владеет блокировкой и при успешном получении сигнала должен
-            // ее вернуть, чтобы другой ожидающий поток смог 
-            bool hasLock = false;
+            int signals;
+            Thread.MemoryBarrier(); // m_wakeEvents volatile first read
             do {
-                int signals;
-                do {
-                    signals = m_wakeEvents;
-                    if (signals == 0)
-                        break;
-                } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
+                signals = m_wakeEvents;
+                if (signals == 0)
+                    break;
+            } while (Interlocked.CompareExchange(ref m_wakeEvents, signals - 1, signals) != signals);
 
-                if (signals >= 1) {
-                    if (signals > 1 && hasLock)
-                        m_hasTasks.Set();
+            if (signals == 0) {
+                // no signal is fetched
+                lock(m_signalLocker) {
+                    while(m_wakeEvents == 0) {
+                        if (timeout != -1)
+                            timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+                        if(!Monitor.Wait(m_signalLocker,timeout))
+                            return false; // timeout
+                    }
+                    // m_wakeEvents > 0
+                    if (Interlocked.Decrement(ref m_wakeEvents) > 0) //syncronized
+                        Monitor.Pulse(m_signalLocker);
+
+                    // signal fetched
                     return true;
                 }
                 
-                if (timeout != -1)
-                    timeout = Math.Max(0, timeout - (Environment.TickCount - start));
+            } else {
+                // signal fetched
+                return true;
+            }
 
-                // если сигналов больше не осталось, то первый поток, который дошел сюда сбросит событие
-                // и уйдет на пустой цикл, после чего заблокируется
 
-                hasLock = true; 
-            } while (m_hasTasks.WaitOne(timeout));
-
-            return false;
         }
 
         bool Sleep(int timeout) {
@@ -131,7 +140,8 @@
         /// Запускает либо новый поток, если раньше не было ни одного потока, либо устанавливает событие пробуждение одного спящего потока
         /// </summary>
         protected void GrowPool() {
-            if (m_exitRequired != 0)
+            Thread.MemoryBarrier();
+            if (m_exitRequired == 1)
                 return;
             if (m_sleepingThreads > m_wakeEvents) {
                 //Console.WriteLine("Waking threads (sleeps {0}, pending {1})", m_sleepingThreads, m_wakeEvents);
@@ -204,7 +214,7 @@
             // use spins to allocate slot for the new thread
             do {
                 current = m_createdThreads;
-                if (current >= m_maxThreads || m_exitRequired != 0)
+                if (current >= m_maxThreads || m_exitRequired == 1)
                     // no more slots left or the pool has been disposed
                     return false;
             } while (current != Interlocked.CompareExchange(ref m_createdThreads, current + 1, current));
@@ -227,6 +237,7 @@
             last = false;
             int current;
             // use spins to release slot for the new thread
+            Thread.MemoryBarrier();
             do {
                 current = m_createdThreads;
                 if (current <= m_minThreads && m_exitRequired == 0)
@@ -264,6 +275,7 @@
                 // slot successfully allocated
                 var worker = new Thread(this.Worker);
                 worker.IsBackground = true;
+                Interlocked.Increment(ref m_activeThreads);
                 worker.Start();
 
                 return true;
@@ -277,15 +289,14 @@
         protected virtual void Worker() {
             TUnit unit;
             //Console.WriteLine("{0}: Active", Thread.CurrentThread.ManagedThreadId);
-            Interlocked.Increment(ref m_activeThreads);
+            int count = 0;;
+            Thread.MemoryBarrier();
             do {
                 // exit if requested
-                if (m_exitRequired != 0) {
+                if (m_exitRequired == 1) {
                     // release the thread slot
                     Interlocked.Decrement(ref m_activeThreads);
-                    if (ReleaseThreadSlotAnyway()) // it was the last worker
-                        m_hasTasks.Dispose();
-                    else
+                    if (!ReleaseThreadSlotAnyway()) // it was the last worker
                         SignalThread(); // wake next worker
                     break;
                 }
@@ -293,14 +304,17 @@
                 // fetch task
                 if (TryDequeue(out unit)) {
                     InvokeUnit(unit);
+                    count ++;
                     continue;
                 }
                 Interlocked.Decrement(ref m_activeThreads);
 
+                Console.WriteLine("{0}: Suspend processed({1})", Thread.CurrentThread.ManagedThreadId,count);
                 // entering suspend state
                 // keep this thread and wait                
                 if (!Suspend())
                     break;
+                count = 0;
                 //Console.WriteLine("{0}: Awake", Thread.CurrentThread.ManagedThreadId);
                 Interlocked.Increment(ref m_activeThreads);
             } while (true);
@@ -309,15 +323,10 @@
 
         protected virtual void Dispose(bool disposing) {
             if (disposing) {
-                if (m_exitRequired == 0) {
-                    if (Interlocked.CompareExchange(ref m_exitRequired, 1, 0) != 0)
-                        return;
-
+                if (0 == Interlocked.CompareExchange(ref m_exitRequired, 1, 0)) { // implies memory barrier
                     // wake sleeping threads
                     if (m_createdThreads > 0)
                         SignalThread();
-                    else
-                        m_hasTasks.Dispose();
                     GC.SuppressFinalize(this);
                 }
             }
--- a/Implab/Parallels/MTQueue.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Parallels/MTQueue.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -18,6 +18,8 @@
         Node m_last;
 
         public void Enqueue(T value) {
+            Thread.MemoryBarrier();
+
             var last = m_last;
             var next = new Node(value);
 
@@ -35,6 +37,7 @@
             Node next = null;
             value = default(T);
 
+            Thread.MemoryBarrier();
             do {
                 first = m_first;
                 if (first == null)
--- a/Implab/Parallels/WorkerPool.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Parallels/WorkerPool.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -12,20 +12,24 @@
         MTQueue<Action> m_queue = new MTQueue<Action>();
         int m_queueLength = 0;
         readonly int m_threshold = 1;
+        int m_workers = 0;
 
         public WorkerPool(int minThreads, int maxThreads, int threshold)
             : base(minThreads, maxThreads) {
             m_threshold = threshold;
+            m_workers = minThreads;
             InitPool();
         }
 
         public WorkerPool(int minThreads, int maxThreads) :
             base(minThreads, maxThreads) {
+            m_workers = minThreads;
             InitPool();
         }
 
         public WorkerPool(int threads)
             : base(threads) {
+            m_workers = threads;
             InitPool();
         }
 
@@ -62,8 +66,10 @@
             var len = Interlocked.Increment(ref m_queueLength);
             m_queue.Enqueue(unit);
 
-            if (len > m_threshold*ActiveThreads)
+            if (len > m_threshold * m_workers) {
+                Interlocked.Increment(ref m_workers);
                 GrowPool();
+            }
         }
 
         protected override bool TryDequeue(out Action unit) {
@@ -85,8 +91,10 @@
             // Suspend
             //    queueLength > 0
             // continue
+            Thread.MemoryBarrier();
             if (m_queueLength > 0)
                 return true;
+            Interlocked.Decrement(ref m_workers);
             return base.Suspend();
         }
 
--- a/Implab/Promise.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Promise.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -142,18 +142,20 @@
 
         void WaitTransition() {
             while (m_state == TRANSITIONAL_STATE) {
-                /* noop */
+                Thread.MemoryBarrier();
             }
         }
 
         public bool IsResolved {
             get {
+                Thread.MemoryBarrier();
                 return m_state > 1;
             }
         }
 
         public bool IsCancelled {
             get {
+                Thread.MemoryBarrier();
                 return m_state == CANCELLED_STATE;
             }
         }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/SyncPool.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -0,0 +1,85 @@
+using System;
+using Implab.Parallels;
+using System.Threading;
+
+namespace Implab {
+    /*public class SyncPool<T> : IDisposable {
+        readonly Func<T> m_factory;
+        readonly Action<T> m_cleanup;
+        readonly int m_size;
+        readonly MTQueue<T> m_queue = new MTQueue<T>();
+
+        volatile bool m_disposed;
+
+        volatile int m_count;
+
+        public SyncPool(Func<T> factory, Action<T> cleanup, int size) {
+            Safe.ArgumentNotNull(factory, "factory");
+            Safe.ArgumentInRange(size, 1, size, "size");
+
+            m_factory = factory;
+            m_cleanup = cleanup;
+            m_size = size;
+        }
+
+        public SyncPool(Func<T> factory, Action<T> cleanup) : this(factory,cleanup,Environment.ProcessorCount+1) {
+        }
+
+        public SyncPool(Func<T> factory) : this(factory,null,Environment.ProcessorCount+1) {
+        }
+
+        public SyncPoolWrapper<T> Allocate() {
+            if (m_disposed)
+                throw new ObjectDisposedException(this.ToString());
+
+            T instance;
+            if (m_queue.TryDequeue(out instance)) {
+                Interlocked.Decrement(ref m_count);
+                return instance;
+            } else {
+                instance = m_factory();
+            }
+            return new SyncPoolWrapper<T>(instance, this);
+        }
+
+        public void Release(T instance) {
+            if (m_count < m_size && !m_disposed) {
+                Interlocked.Increment(ref m_count);
+
+                if (m_cleanup != null)
+                    m_cleanup(instance);
+
+                m_queue.Enqueue(instance);
+
+                // пока элемент возвращался в кеш, была начата операция освобождения всего кеша
+                // и возможно уже законцена, в таком случае следует извлечь элемент обратно и
+                // освободить его. Если операция освобождения кеша еще не заврешилась, то будет
+                // изъят и освобожден произвольный элемен, что не повлияет на ход всего процесса.
+                if (m_disposed && m_queue.TryDequeue(out instance))
+                    Safe.Dispose(instance);
+
+            } else {
+                Safe.Dispose(instance);
+            }
+        }
+
+        protected virtual void Dispose(bool disposing) {
+            if (disposing) {
+                m_disposed = true;
+                T instance;
+                while (m_queue.TryDequeue(out instance))
+                    Safe.Dispose(instance);
+            }
+        }
+
+        #region IDisposable implementation
+
+        public void Dispose() {
+            Dispose(true);
+            GC.SuppressFinalize(this);
+        }
+
+        #endregion
+    }*/
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/SyncPoolWrapper.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -0,0 +1,24 @@
+using System;
+
+namespace Implab {
+    /*public struct SyncPoolWrapper<T> : IDisposable {
+        readonly T m_value;
+        readonly SyncPool<T> m_pool;
+
+        internal SyncPoolWrapper(T value, SyncPool<T> pool) {
+            m_value = value;
+            m_pool = pool;
+        }
+
+        public T Value {
+            get { return m_value; }
+        }
+
+        #region IDisposable implementation
+        public void Dispose() {
+            m_pool.Release(m_value);
+        }
+        #endregion
+    }*/
+}
+