Mercurial > pub > ImplabNet
diff Implab/Parallels/BlockingQueue.cs @ 137:238e15580926 v2
added the blocking queue
author | cin |
---|---|
date | Mon, 16 Feb 2015 17:48:39 +0300 |
parents | |
children | 041b77711262 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/BlockingQueue.cs Mon Feb 16 17:48:39 2015 +0300 @@ -0,0 +1,87 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + public class BlockingQueue<T> : AsyncQueue<T> { + readonly object m_lock = new object(); + + public override void Enqueue(T value) { + base.Enqueue(value); + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public override void EnqueueRange(T[] data, int offset, int length) { + base.EnqueueRange(data, offset, length); + if (length > 1) + lock (m_lock) + Monitor.PulseAll(m_lock); + else + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public T GetItem(int timeout) { + T item; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeue(out item)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + return item; + } + + public T GetItem() { + T item; + while (!TryDequeue(out item)) + lock (m_lock) + Monitor.Wait(m_lock); + return item; + } + + public T[] GetRange(int max, int timeout) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + var t1 = Environment.TickCount; + var dt = timeout; + while (!TryDequeueRange(buffer,0,max,out actual)) { + lock (m_lock) + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + + public T[] GetRange(int max) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + while (!TryDequeueRange(buffer, 0, max, out actual)) + lock (m_lock) + Monitor.Wait(m_lock); + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + } +} +