Mercurial > pub > ImplabNet
view Implab/Parallels/BlockingQueue.cs @ 259:7d52dc684bbd v3
PollingComponent: implemented correct stopping
author | cin |
---|---|
date | Fri, 13 Apr 2018 03:57:39 +0300 |
parents | 7c7e9ad6fe4a |
children |
line wrap: on
line source
using System; using System.Threading; namespace Implab.Parallels { public class BlockingQueue<T> : AsyncQueue<T> { readonly object m_lock = new object(); public void EnqueuePulse(T value) { base.Enqueue(value); lock (m_lock) Monitor.Pulse(m_lock); } public void EnqueueRangePulse(T[] data, int offset, int length) { base.EnqueueRange(data, offset, length); if (length > 1) lock (m_lock) Monitor.PulseAll(m_lock); else lock (m_lock) Monitor.Pulse(m_lock); } public T GetItem(int timeout) { T item; if (!TryDequeue(out item)) { var t1 = Environment.TickCount; var dt = timeout; lock (m_lock) { while (!TryDequeue(out item)) { if (!Monitor.Wait(m_lock, dt)) throw new TimeoutException(); if (timeout >= 0) { dt = timeout - Environment.TickCount + t1; if (dt < 0) throw new TimeoutException(); } } } } return item; } public T GetItem() { T item; if (!TryDequeue(out item)) lock (m_lock) { while (!TryDequeue(out item)) Monitor.Wait(m_lock); } return item; } public T[] GetRange(int max, int timeout) { Safe.ArgumentInRange(max > 0 , nameof(max)); var buffer = new T[max]; int actual; if (!TryDequeueRange(buffer, 0, max, out actual)) { var t1 = Environment.TickCount; var dt = timeout; lock (m_lock) { while (!TryDequeueRange(buffer, 0, max, out actual)) { if (!Monitor.Wait(m_lock, dt)) throw new TimeoutException(); if (timeout >= 0) { dt = timeout - Environment.TickCount + t1; if (dt < 0) throw new TimeoutException(); } } } } var data = new T[actual]; Array.Copy(buffer, data, actual); return data; } public T[] GetRange(int max) { Safe.ArgumentInRange(max > 0, nameof(max)); var buffer = new T[max]; int actual; if (!TryDequeueRange(buffer, 0, max, out actual)) lock (m_lock) while (!TryDequeueRange(buffer, 0, max, out actual)) Monitor.Wait(m_lock); var data = new T[actual]; Array.Copy(buffer, data, actual); return data; } } }