Mercurial > pub > ImplabNet
diff Implab/Parallels/BlockingQueue.cs @ 192:f1da3afc3521 release v2.1
Слияние с v2
author | cin |
---|---|
date | Fri, 22 Apr 2016 13:10:34 +0300 |
parents | 041b77711262 |
children | d6fe09f5592c |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Implab/Parallels/BlockingQueue.cs Fri Apr 22 13:10:34 2016 +0300 @@ -0,0 +1,101 @@ +using System; +using System.Threading; + +namespace Implab.Parallels { + public class BlockingQueue<T> : AsyncQueue<T> { + readonly object m_lock = new object(); + + public override void Enqueue(T value) { + base.Enqueue(value); + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public override void EnqueueRange(T[] data, int offset, int length) { + base.EnqueueRange(data, offset, length); + if (length > 1) + lock (m_lock) + Monitor.PulseAll(m_lock); + else + lock (m_lock) + Monitor.Pulse(m_lock); + } + + public T GetItem(int timeout) { + T item; + + if (!TryDequeue(out item)) { + var t1 = Environment.TickCount; + var dt = timeout; + + lock (m_lock) { + while (!TryDequeue(out item)) { + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + } + } + return item; + } + + public T GetItem() { + T item; + if (!TryDequeue(out item)) + lock (m_lock) { + while (!TryDequeue(out item)) + Monitor.Wait(m_lock); + } + return item; + } + + public T[] GetRange(int max, int timeout) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + if (!TryDequeueRange(buffer, 0, max, out actual)) { + var t1 = Environment.TickCount; + var dt = timeout; + + lock (m_lock) { + while (!TryDequeueRange(buffer, 0, max, out actual)) { + + if (!Monitor.Wait(m_lock, dt)) + throw new TimeoutException(); + + if (timeout >= 0) { + dt = timeout - Environment.TickCount + t1; + if (dt < 0) + throw new TimeoutException(); + } + } + } + } + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + + public T[] GetRange(int max) { + Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); + + var buffer = new T[max]; + int actual; + if (!TryDequeueRange(buffer, 0, max, out actual)) + lock (m_lock) + while (!TryDequeueRange(buffer, 0, max, out actual)) + Monitor.Wait(m_lock); + + var data = new T[actual]; + Array.Copy(buffer, data, actual); + return data; + } + } +} +