Mercurial > pub > ImplabNet
view Implab/Parallels/BlockingQueue.cs @ 138:f75cfa58e3d4 v2
added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
author | cin |
---|---|
date | Tue, 17 Feb 2015 18:16:26 +0300 |
parents | 238e15580926 |
children | 041b77711262 |
line wrap: on
line source
using System; using System.Threading; namespace Implab.Parallels { public class BlockingQueue<T> : AsyncQueue<T> { readonly object m_lock = new object(); public override void Enqueue(T value) { base.Enqueue(value); lock (m_lock) Monitor.Pulse(m_lock); } public override void EnqueueRange(T[] data, int offset, int length) { base.EnqueueRange(data, offset, length); if (length > 1) lock (m_lock) Monitor.PulseAll(m_lock); else lock (m_lock) Monitor.Pulse(m_lock); } public T GetItem(int timeout) { T item; var t1 = Environment.TickCount; var dt = timeout; while (!TryDequeue(out item)) { lock (m_lock) if (!Monitor.Wait(m_lock, dt)) throw new TimeoutException(); if (timeout >= 0) { dt = timeout - Environment.TickCount + t1; if (dt < 0) throw new TimeoutException(); } } return item; } public T GetItem() { T item; while (!TryDequeue(out item)) lock (m_lock) Monitor.Wait(m_lock); return item; } public T[] GetRange(int max, int timeout) { Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); var buffer = new T[max]; int actual; var t1 = Environment.TickCount; var dt = timeout; while (!TryDequeueRange(buffer,0,max,out actual)) { lock (m_lock) if (!Monitor.Wait(m_lock, dt)) throw new TimeoutException(); if (timeout >= 0) { dt = timeout - Environment.TickCount + t1; if (dt < 0) throw new TimeoutException(); } } var data = new T[actual]; Array.Copy(buffer, data, actual); return data; } public T[] GetRange(int max) { Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max"); var buffer = new T[max]; int actual; while (!TryDequeueRange(buffer, 0, max, out actual)) lock (m_lock) Monitor.Wait(m_lock); var data = new T[actual]; Array.Copy(buffer, data, actual); return data; } } }