changeset 136:e9e7940c7d98 v2

shared locks + tests
author cin
date Mon, 16 Feb 2015 01:14:09 +0300 (2015-02-15)
parents 656815cb7147
children 238e15580926
files Implab.Test/AsyncTests.cs Implab/Parallels/SharedLock.cs Implab/PromiseExtensions.cs MonoPlay/Program.cs
diffstat 4 files changed, 296 insertions(+), 33 deletions(-) [+]
line wrap: on
line diff
--- a/Implab.Test/AsyncTests.cs	Fri Feb 13 02:08:01 2015 +0300
+++ b/Implab.Test/AsyncTests.cs	Mon Feb 16 01:14:09 2015 +0300
@@ -774,6 +774,79 @@
                 Assert.IsTrue(pSurvive.Join());
             }
         }
+
+        [TestMethod]
+        public void SharedLockTest() {
+            var l = new SharedLock();
+            int shared = 0;
+            int exclusive = 0;
+            var s1 = new Signal();
+            var log = new AsyncQueue<string>();
+
+            try {
+                AsyncPool.RunThread(
+                    () => {
+                        log.Enqueue("Reader #1 started");
+                        try {
+                            l.LockShared();
+                            log.Enqueue("Reader #1 lock got");
+                            if (Interlocked.Increment(ref shared) == 2)
+                                s1.Set();
+                            s1.Wait();
+                            log.Enqueue("Reader #1 finished");
+                            Interlocked.Decrement(ref shared);
+                        } finally {
+                            l.Release();
+                            log.Enqueue("Reader #1 lock released");
+                        }
+                    },
+                    () => {
+                        log.Enqueue("Reader #2 started");
+
+                        try {
+                            l.LockShared();
+                            log.Enqueue("Reader #2 lock got");
+
+                            if (Interlocked.Increment(ref shared) == 2)
+                                s1.Set();
+                            s1.Wait();
+                            log.Enqueue("Reader #2 upgrading to writer");
+                            Interlocked.Decrement(ref shared);
+                            l.Upgrade();
+                            log.Enqueue("Reader #2 upgraded");
+
+                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
+                            Assert.AreEqual(0, shared);
+                            log.Enqueue("Reader #2 finished");
+                            Interlocked.Decrement(ref exclusive);
+                        } finally {
+                            l.Release();
+                            log.Enqueue("Reader #2 lock released");
+                        }
+                    },
+                    () => {
+                        log.Enqueue("Writer #1 started");
+                        try {
+                            l.LockExclusive();
+                            log.Enqueue("Writer #1 got the lock");
+                            Assert.AreEqual(1, Interlocked.Increment(ref exclusive));
+                            Interlocked.Decrement(ref exclusive);
+                            log.Enqueue("Writer #1 is finished");
+                        } finally {
+                            l.Release();
+                            log.Enqueue("Writer #1 lock released");
+                        }
+                    }
+                ).Bundle().Join(1000);
+                log.Enqueue("Done");
+            } catch(Exception error) {
+                log.Enqueue(error.Message);
+                throw;
+            } finally {
+                foreach (var m in log)
+                    Console.WriteLine(m);
+            }
+        }
     }
 }
 
--- a/Implab/Parallels/SharedLock.cs	Fri Feb 13 02:08:01 2015 +0300
+++ b/Implab/Parallels/SharedLock.cs	Mon Feb 16 01:14:09 2015 +0300
@@ -8,23 +8,53 @@
     /// </summary>
     public class SharedLock {
         readonly object m_lock = new object();
+        // the count of locks currently acquired by clients
         int m_locks;
+        // the count of pending requests for upgrade
+        int m_upgrades;
         bool m_exclusive;
 
         public bool LockExclusive(int timeout) {
             lock (m_lock) {
-                if (m_locks > 0 && !Monitor.Wait(m_lock, timeout))
-                    return false;
+                var dt = timeout;
+                if (m_locks > m_upgrades) {
+                    var t1 = Environment.TickCount;
+                    do {
+                        if (!Monitor.Wait(m_lock, timeout))
+                            return false;
+
+                        if (m_locks == m_upgrades)
+                            break;
+
+                        if (timeout > 0) {
+                            dt = timeout - Environment.TickCount + t1;
+                            if (dt < 0)
+                                return false;
+                        }
+                    } while(true);
+                }
                 m_exclusive = true;
-                m_locks = 1;
+                m_locks ++;
                 return true;
             }
         }
 
         public void LockExclusive() {
-            LockExclusive(-1);
+            lock (m_lock) {
+
+                while (m_locks > m_upgrades)
+                    Monitor.Wait(m_lock);
+                            
+                m_exclusive = true;
+                m_locks ++;
+            }
         }
 
+        /// <summary>
+        /// Acquires a shared lock.
+        /// </summary>
+        /// <returns><c>true</c>, if the shared lock was acquired, <c>false</c> if the specified timeout was expired.</returns>
+        /// <param name="timeout">Timeout.</param>
         public bool LockShared(int timeout) {
             lock (m_lock) {
                 if (!m_exclusive) {
@@ -32,45 +62,141 @@
                     return true;
                 }
 
-                if (m_locks == 0) {
+                if (m_locks == m_upgrades) {
                     m_exclusive = false;
                     m_locks = 1;
                     return true;
                 }
-                
-                if (Monitor.Wait(m_lock, timeout)) {
-                    Debug.Assert(m_locks == 0);
-                    m_locks = 1;
+
+                var t1 = Environment.TickCount;
+                var dt = timeout;
+                do {
+                    if (!Monitor.Wait(m_lock, dt))
+                        return false;
+
+                    if (m_locks == m_upgrades || !m_exclusive)
+                        break;
+
+                    if (timeout >= 0) {
+                        dt = timeout - Environment.TickCount + t1;
+                        if (dt < 0)
+                            return false;
+                    }
+                } while(true);
+
+                m_locks ++;
+                m_exclusive = false;
+                return true;
+            }
+        }
+
+        /// <summary>
+        /// Acquires the shared lock.
+        /// </summary>
+        public void LockShared() {
+            lock (m_lock) {
+                if (!m_exclusive) {
+                    m_locks++;
+                } else if (m_locks == m_upgrades) {
                     m_exclusive = false;
-                    return true;
+                    m_locks++;
+                } else {
+                    while (m_exclusive && m_locks > m_upgrades)
+                        Monitor.Wait(m_lock);
+
+                    m_locks++;
+                    m_exclusive = false;
                 }
-                return false;
             }
         }
 
-        public void LockShared() {
-            LockShared(-1);
-        }
-
-        public void ReleaseShared() {
+        /// <summary>
+        /// Upgrades the current lock to exclusive level.
+        /// </summary>
+        /// <remarks>If the current lock is exclusive already the method does nothing.</remarks>
+        public void Upgrade() {
             lock (m_lock) {
-                if (m_exclusive || m_locks <= 0)
-                    throw new InvalidOperationException();
-                m_locks--;
-                if (m_locks == 0)
-                    Monitor.PulseAll(m_lock);
+                if (!m_exclusive) {
+
+                    if (m_locks <= m_upgrades)
+                        throw new InvalidOperationException();
+
+                    if (m_locks - m_upgrades == 1) {
+                        m_exclusive = true;
+                    } else {
+                        m_upgrades++;
+
+                        while (m_locks > m_upgrades)
+                            Monitor.Wait(m_lock);
+
+                        m_upgrades--;
+                        m_exclusive = true;
+                    }
+                }
             }
         }
 
-        public void ReleaseExclusive() {
+        /// <summary>
+        /// Upgrades the current lock to exclusive level.
+        /// </summary>
+        /// <param name="timeout">Timeout.</param>
+        /// <returns><c>true</c> if the current lock was updated, <c>false</c> the specified timeout was expired.</returns>
+        /// <remarks>If the current lock is exclusive already the method does nothing.</remarks>
+        public bool Upgrade(int timeout) {
             lock (m_lock) {
-                if (!m_exclusive && m_locks != 1)
+                if (m_exclusive)
+                    return true;
+                if (m_locks <= m_upgrades)
                     throw new InvalidOperationException();
-                m_locks = 0;
-                Monitor.PulseAll(m_lock);
+
+                if (m_locks - m_upgrades == 1) {
+                    m_exclusive = true;
+                } else {
+                    var t1 = Environment.TickCount;
+                    var dt = timeout;
+                    m_upgrades++;
+                    do {
+                        if (!Monitor.Wait(m_lock, dt)) {
+                            m_upgrades--;
+                            return false;
+                        }
+
+                        // we may get there but the shared lock already aquired
+                        if (m_locks == m_upgrades)
+                            break;
+
+                        if (timeout >= 0) {
+                            dt = timeout - Environment.TickCount + t1;
+                            if (dt < 0) {
+                                m_upgrades--;
+                                return false;
+                            }
+                        }
+                    } while(true);
+                    m_upgrades--;
+                    m_exclusive = true;
+                }
+                return true;
             }
         }
 
+        /// <summary>
+        /// Downgrades this lock to shared level.
+        /// </summary>
+        public void Downgrade() {
+            lock (m_lock)
+                m_exclusive = false;
+        }
+
+        /// <summary>
+        /// Releases the current lock.
+        /// </summary>
+        public void Release() {
+            lock (m_lock)
+                // if no more running threads left
+                if (--m_locks == m_upgrades)
+                    Monitor.PulseAll(m_lock);
+        }
     }
 }
 
--- a/Implab/PromiseExtensions.cs	Fri Feb 13 02:08:01 2015 +0300
+++ b/Implab/PromiseExtensions.cs	Mon Feb 16 01:14:09 2015 +0300
@@ -101,6 +101,11 @@
             int errors = 0;
             var medium = new Promise();
 
+            if (count == 0) {
+                medium.Resolve();
+                return medium;
+            }
+
             medium.On(() => {
                 foreach(var p2 in that)
                     p2.Cancel();
--- a/MonoPlay/Program.cs	Fri Feb 13 02:08:01 2015 +0300
+++ b/MonoPlay/Program.cs	Mon Feb 16 01:14:09 2015 +0300
@@ -4,6 +4,7 @@
 using Implab;
 using System.Collections.Generic;
 using System.Collections.Concurrent;
+using System.Threading;
 
 namespace MonoPlay {
     class MainClass {
@@ -11,24 +12,82 @@
             if (args == null)
                 throw new ArgumentNullException("args");
 
-            const int count = 10000000;
-
             var t1 = Environment.TickCount;
 
-            for (int i = 0; i < count; i++) {
-                var p = new Promise<int>();
+            const int reads = 100000;
+            const int writes = 1000;
+            const int readThreads = 8;
+            const int writeThreads = 0;
+
+            var l = new SharedLock();
+            var st = new HashSet<int>();
 
-                p.On(x => {}).On(x => {});
+            Action reader1 = () => {
+                for (int i =0; i < reads; i++) {
+                    try {
+                        l.LockShared();
+                        st.Contains(i % 1000);
+                        Thread.Sleep(0);
+                    } finally {
+                        l.Release();
+                    }
+                }
+            };
+
+            Action reader2 = () => {
+                for(var i = 0; i < reads; i++)
+                    lock(st) {
+                        st.Contains(i % 1000);
+                        Thread.Sleep(0);
+                    }
+            };
 
-                p.Resolve(i);
+            Action writer1 = () => {
+                var rnd = new Random(Environment.TickCount);
+                for (int i = 0; i < writes; i++) {
+                    try {
+                        l.LockExclusive();
+                        st.Add(rnd.Next(1000));
+                        //Thread.Sleep(1);
+                    } finally {
+                        l.Release();
+                    }
+                }
+            };
 
-            }
+            Action writer2 = () => {
+                var rnd = new Random(Environment.TickCount);
+                for (int i = 0; i < writes; i++) {
+                    lock (st) {
+                        st.Add(rnd.Next(1000));
+                        //Thread.Sleep(1);
+                    }
+                }
+            };
+
+
 
-           
+            var readers = new IPromise[readThreads];
+            for (int i = 0; i < readThreads; i++)
+                readers[i] = AsyncPool.RunThread(reader1);
+
+            var writers = new IPromise[writeThreads];
+            for (int i = 0; i < writeThreads; i++)
+                writers[i] = AsyncPool.RunThread(writer1);
+
+
+            new [] {
+                readers.Bundle().On(() => Console.WriteLine("readers complete in {0} ms", Environment.TickCount - t1)),
+                writers.Bundle().On(() => Console.WriteLine("writers complete in {0} ms", Environment.TickCount - t1))
+            }.Bundle().Join();
+
+
 
             var t2 = Environment.TickCount;
             Console.WriteLine("done: {0} ms, {1:.00} Mb, {2} GC", t2 - t1, GC.GetTotalMemory(false) / (1024*1024), GC.CollectionCount(0) );
 
         }
+
+
     }
 }