Mercurial > pub > ImplabNet
changeset 137:238e15580926 v2
added the blocking queue
author | cin |
---|---|
date | Mon, 16 Feb 2015 17:48:39 +0300 (2015-02-16) |
parents | e9e7940c7d98 |
children | f75cfa58e3d4 |
files | Implab/Implab.csproj Implab/Parallels/AsyncQueue.cs Implab/Parallels/BlockingQueue.cs |
diffstat | 3 files changed, 90 insertions(+), 2 deletions(-) [+] |
line wrap: on
line diff
--- a/Implab/Implab.csproj Mon Feb 16 01:14:09 2015 +0300 +++ b/Implab/Implab.csproj Mon Feb 16 17:48:39 2015 +0300 @@ -155,6 +155,7 @@ <Compile Include="Parallels\SharedLock.cs" /> <Compile Include="Diagnostics\ILogWriter.cs" /> <Compile Include="Diagnostics\ListenerBase.cs" /> + <Compile Include="Parallels\BlockingQueue.cs" /> </ItemGroup> <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" /> <ItemGroup />
--- a/Implab/Parallels/AsyncQueue.cs Mon Feb 16 01:14:09 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Mon Feb 16 17:48:39 2015 +0300 @@ -158,7 +158,7 @@ /// Adds the specified value to the queue. /// </summary> /// <param name="value">Tha value which will be added to the queue.</param> - public void Enqueue(T value) { + public virtual void Enqueue(T value) { var last = m_last; // spin wait to the new chunk bool extend = true; @@ -184,7 +184,7 @@ /// <param name="data">The buffer which contains the data to be enqueued.</param> /// <param name="offset">The offset of the data in the buffer.</param> /// <param name="length">The size of the data to read from the buffer.</param> - public void EnqueueRange(T[] data, int offset, int length) { + public virtual void EnqueueRange(T[] data, int offset, int length) { if (data == null) throw new ArgumentNullException("data"); if (length == 0)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/BlockingQueue.cs Mon Feb 16 17:48:39 2015 +0300 @@ -0,0 +1,87 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + public class BlockingQueue<T> : AsyncQueue<T> { + readonly object m_lock = new object(); + + public override void Enqueue(T value) { + base.Enqueue(value); + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public override void EnqueueRange(T[] data, int offset, int length) { + base.EnqueueRange(data, offset, length); + if (length > 1) + lock (m_lock) + Monitor.PulseAll(m_lock); + else + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public T GetItem(int timeout) { + T item; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeue(out item)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + return item; + } + + public T GetItem() { + T item; + while (!TryDequeue(out item)) + lock (m_lock) + Monitor.Wait(m_lock); + return item; + } + + public T[] GetRange(int max, int timeout) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeueRange(buffer,0,max,out actual)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + + public T[] GetRange(int max) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + while (!TryDequeueRange(buffer, 0, max, out actual)) + lock (m_lock) + Monitor.Wait(m_lock); + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + } +} +