view Implab/Parallels/BlockingQueue.cs @ 246:5aa9cfbe56c3 v3

missing files
author cin
date Fri, 26 Jan 2018 11:19:38 +0300
parents d6fe09f5592c
children 7c7e9ad6fe4a
line wrap: on
line source

using System;
using System.Threading;

namespace Implab.Parallels {
    public class BlockingQueue<T> : AsyncQueue<T> {
        readonly object m_lock = new object();

        public void EnqueuePulse(T value) {
            base.Enqueue(value);
            lock (m_lock)
                Monitor.Pulse(m_lock);
        }

        public void EnqueueRangePulse(T[] data, int offset, int length) {
            base.EnqueueRange(data, offset, length);
            if (length > 1)
                lock (m_lock)
                    Monitor.PulseAll(m_lock);
            else
                lock (m_lock)
                    Monitor.Pulse(m_lock);
        }

        public T GetItem(int timeout) {
            T item;

            if (!TryDequeue(out item)) {
                var t1 = Environment.TickCount;
                var dt = timeout;

                lock (m_lock) {
                    while (!TryDequeue(out item)) {
                        if (!Monitor.Wait(m_lock, dt))
                            throw new TimeoutException();
                        if (timeout >= 0) {
                            dt = timeout - Environment.TickCount + t1;
                            if (dt < 0)
                                throw new TimeoutException();
                        }
                    }
                }
            }
            return item;
        }

        public T GetItem() {
            T item;
            if (!TryDequeue(out item))
                lock (m_lock) {
                    while (!TryDequeue(out item))
                        Monitor.Wait(m_lock);
                }
            return item;
        }

        public T[] GetRange(int max, int timeout) {
            Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");

            var buffer = new T[max];
            int actual;
            if (!TryDequeueRange(buffer, 0, max, out actual)) {
                var t1 = Environment.TickCount;
                var dt = timeout;

                lock (m_lock) {
                    while (!TryDequeueRange(buffer, 0, max, out actual)) {
                
                        if (!Monitor.Wait(m_lock, dt))
                            throw new TimeoutException();

                        if (timeout >= 0) {
                            dt = timeout - Environment.TickCount + t1;
                            if (dt < 0)
                                throw new TimeoutException();
                        }
                    }
                }
            }

            var data = new T[actual];
            Array.Copy(buffer, data, actual);
            return data;
        }

        public T[] GetRange(int max) {
            Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");

            var buffer = new T[max];
            int actual;
            if (!TryDequeueRange(buffer, 0, max, out actual))
                lock (m_lock)
                    while (!TryDequeueRange(buffer, 0, max, out actual))
                        Monitor.Wait(m_lock);

            var data = new T[actual];
            Array.Copy(buffer, data, actual);
            return data;
        }
    }
}