Mercurial > pub > ImplabNet
changeset 129:471f596b2603 v2
Added SharedLock to synchronization routines
author | cin |
---|---|
date | Thu, 29 Jan 2015 18:31:06 +0300 |
parents | 6241bff0cd64 |
children | 671f60cd0250 |
files | Implab/Implab.csproj Implab/Parallels/AsyncQueue.cs Implab/Parallels/MTCustomQueue.cs Implab/Parallels/MTCustomQueueNode.cs Implab/Parallels/SharedLock.cs Implab/Parallels/Signal.cs Implab/Parallels/WorkerPool.cs |
diffstat | 7 files changed, 82 insertions(+), 149 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab/Implab.csproj Thu Jan 29 05:09:31 2015 +0300 +++ b/Implab/Implab.csproj Thu Jan 29 18:31:06 2015 +0300 @@ -143,8 +143,6 @@ <Compile Include="Diagnostics\Extensions.cs" /> <Compile Include="IComponentContainer.cs" /> <Compile Include="PromiseEventType.cs" /> - <Compile Include="Parallels\MTCustomQueue.cs" /> - <Compile Include="Parallels\MTCustomQueueNode.cs" /> <Compile Include="ComponentContainer.cs" /> <Compile Include="DisposablePool.cs" /> <Compile Include="ObjectPool.cs" /> @@ -156,6 +154,7 @@ <Compile Include="Promise.cs" /> <Compile Include="PromiseTransientException.cs" /> <Compile Include="Parallels\Signal.cs" /> + <Compile Include="Parallels\SharedLock.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup />
--- a/Implab/Parallels/AsyncQueue.cs Thu Jan 29 05:09:31 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Thu Jan 29 18:31:06 2015 +0300 @@ -495,11 +495,11 @@ #region ICollection implementation public void Add(T item) { - throw new InvalidOperationException(); + throw new NotSupportedException(); } public void Clear() { - throw new InvalidOperationException(); + throw new NotSupportedException(); } public bool Contains(T item) { @@ -511,7 +511,7 @@ } public bool Remove(T item) { - throw new NotImplementedException(); + throw new NotSupportedException(); } public int Count {
--- a/Implab/Parallels/MTCustomQueue.cs Thu Jan 29 05:09:31 2015 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,135 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading; -using System.Collections; - -namespace Implab.Parallels { - public class MTCustomQueue<TNode> : IEnumerable<TNode> where TNode : MTCustomQueueNode<TNode> { - TNode m_first; - TNode m_last; - - public void Enqueue(TNode next) { - Thread.MemoryBarrier(); - - var last = m_last; - - // Interlocaked.CompareExchange implies Thread.MemoryBarrier(); - // to ensure that the next node is completely constructed - while (last != Interlocked.CompareExchange(ref m_last, next, last)) - last = m_last; - - if (last != null) - last.next = next; - else - m_first = next; - } - - public bool TryDequeue(out TNode node) { - TNode first; - TNode next; - node = null; - - Thread.MemoryBarrier(); - do { - first = m_first; - if (first == null) - return false; - next = first.next; - if (next == null) { - // this is the last element, - // then try to update the tail - if (first != Interlocked.CompareExchange(ref m_last, null, first)) { - // this is the race condition - if (m_last == null) - // the queue is empty - return false; - // tail has been changed, we need to restart - continue; - } - - // tail succesfully updated and first.next will never be changed - // other readers will fail due to inconsistency m_last != m_fist && m_first.next == null - // however the parallel writer may update the m_first since the m_last is null - - // so we need to fix inconsistency by setting m_first to null or if it has been - // updated by the writer already then we should just to give up - Interlocked.CompareExchange(ref m_first, null, first); - break; - - } - if (first == Interlocked.CompareExchange(ref m_first, next, first)) - // head succesfully updated - break; - } while (true); - - node = first; - return true; - } - - #region IEnumerable implementation - - class Enumerator : IEnumerator<TNode> { - TNode m_current; - TNode m_first; - - public Enumerator(TNode first) { - m_first = first; - } - - #region IEnumerator implementation - - public bool MoveNext() { - m_current = m_current == null ? m_first : m_current.next; - return m_current != null; - } - - public void Reset() { - m_current = null; - } - - object IEnumerator.Current { - get { - if (m_current == null) - throw new InvalidOperationException(); - return m_current; - } - } - - #endregion - - #region IDisposable implementation - - public void Dispose() { - } - - #endregion - - #region IEnumerator implementation - - public TNode Current { - get { - if (m_current == null) - throw new InvalidOperationException(); - return m_current; - } - } - - #endregion - } - - public IEnumerator<TNode> GetEnumerator() { - return new Enumerator(m_first); - } - - #endregion - - #region IEnumerable implementation - - IEnumerator IEnumerable.GetEnumerator() { - return GetEnumerator(); - } - - #endregion - } -} -
--- a/Implab/Parallels/MTCustomQueueNode.cs Thu Jan 29 05:09:31 2015 +0300 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,6 +0,0 @@ -namespace Implab.Parallels { - public class MTCustomQueueNode<TNode> where TNode : MTCustomQueueNode<TNode> { - public TNode next; - } -} -
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/SharedLock.cs Thu Jan 29 18:31:06 2015 +0300 @@ -0,0 +1,75 @@ +using System; +using System.Threading; +using System.Diagnostics; + +namespace Implab.Parallels { + /// <summary> + /// Implements a lightweight mechanism to aquire a shared or an exclusive lock. + /// </summary> + public class SharedLock { + readonly object m_lock = new object(); + int m_locks; + bool m_exclusive; + + public bool LockExclusive(int timeout) { + lock (m_lock) { + if (m_locks > 0 && !Monitor.Wait(m_lock, timeout)) + return false; + m_exclusive = true; + m_locks = 1; + } + } + + public void LockExclusive() { + LockExclusive(-1); + } + + public bool LockShared(int timeout) { + lock (m_lock) { + if (!m_exclusive) { + m_locks++; + return true; + } + + if (m_lock == 0) { + m_exclusive = false; + m_locks = 1; + return true; + } + + if (Monitor.Wait(m_lock, timeout)) { + Debug.Assert(m_locks == 0); + m_locks = 1; + m_exclusive = false; + return true; + } + return false; + } + } + + public void LockShared() { + LockShared(-1); + } + + public void ReleaseShared() { + lock (m_lock) { + if (m_exclusive || m_locks <= 0) + throw new InvalidOperationException(); + m_locks--; + if (m_locks == 0) + Monitor.PulseAll(m_lock); + } + } + + public void ReleaseExclusive() { + lock (m_lock) { + if (!m_exclusive && m_locks != 1) + throw new InvalidOperationException(); + m_locks = 0; + Monitor.PulseAll(m_lock); + } + } + + } +} +
--- a/Implab/Parallels/Signal.cs Thu Jan 29 05:09:31 2015 +0300 +++ b/Implab/Parallels/Signal.cs Thu Jan 29 18:31:06 2015 +0300 @@ -3,7 +3,7 @@ namespace Implab.Parallels { /// <summary> - /// Implements simple signalling logic using <see cref="Monitor.PulseAll(object)"/>. + /// Implements a simple signalling logic using <see cref="Monitor.PulseAll(object)"/>. /// </summary> public class Signal { readonly object m_lock = new object();
--- a/Implab/Parallels/WorkerPool.cs Thu Jan 29 05:09:31 2015 +0300 +++ b/Implab/Parallels/WorkerPool.cs Thu Jan 29 18:31:06 2015 +0300 @@ -7,7 +7,7 @@ public class WorkerPool : DispatchPool<Action> { AsyncQueue<Action> m_queue = new AsyncQueue<Action>(); - int m_queueLength = 0; + int m_queueLength; readonly int m_threshold = 1; public WorkerPool(int minThreads, int maxThreads, int threshold) @@ -40,7 +40,7 @@ var lop = TraceContext.Instance.CurrentOperation; - EnqueueTask(delegate() { + EnqueueTask(delegate { TraceContext.Instance.EnterLogicalOperation(lop, false); try { promise.Resolve(task());