# HG changeset patch # User cin # Date 1424098119 -10800 # Node ID 238e15580926ee5d0388c090e6e1602f20f68456 # Parent e9e7940c7d98aab2fe347fd4fd25e8ad5fe60a19 added the blocking queue diff -r e9e7940c7d98 -r 238e15580926 Implab/Implab.csproj --- a/Implab/Implab.csproj Mon Feb 16 01:14:09 2015 +0300 +++ b/Implab/Implab.csproj Mon Feb 16 17:48:39 2015 +0300 @@ -155,6 +155,7 @@ + diff -r e9e7940c7d98 -r 238e15580926 Implab/Parallels/AsyncQueue.cs --- a/Implab/Parallels/AsyncQueue.cs Mon Feb 16 01:14:09 2015 +0300 +++ b/Implab/Parallels/AsyncQueue.cs Mon Feb 16 17:48:39 2015 +0300 @@ -158,7 +158,7 @@ /// Adds the specified value to the queue. /// /// Tha value which will be added to the queue. - public void Enqueue(T value) { + public virtual void Enqueue(T value) { var last = m_last; // spin wait to the new chunk bool extend = true; @@ -184,7 +184,7 @@ /// The buffer which contains the data to be enqueued. /// The offset of the data in the buffer. /// The size of the data to read from the buffer. - public void EnqueueRange(T[] data, int offset, int length) { + public virtual void EnqueueRange(T[] data, int offset, int length) { if (data == null) throw new ArgumentNullException("data"); if (length == 0) diff -r e9e7940c7d98 -r 238e15580926 Implab/Parallels/BlockingQueue.cs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/BlockingQueue.cs Mon Feb 16 17:48:39 2015 +0300 @@ -0,0 +1,87 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + public class BlockingQueue : AsyncQueue { + readonly object m_lock = new object(); + + public override void Enqueue(T value) { + base.Enqueue(value); + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public override void EnqueueRange(T[] data, int offset, int length) { + base.EnqueueRange(data, offset, length); + if (length > 1) + lock (m_lock) + Monitor.PulseAll(m_lock); + else + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public T GetItem(int timeout) { + T item; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeue(out item)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + return item; + } + + public T GetItem() { + T item; + while (!TryDequeue(out item)) + lock (m_lock) + Monitor.Wait(m_lock); + return item; + } + + public T[] GetRange(int max, int timeout) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeueRange(buffer,0,max,out actual)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + + public T[] GetRange(int max) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + while (!TryDequeueRange(buffer, 0, max, out actual)) + lock (m_lock) + Monitor.Wait(m_lock); + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + } +} +