diff Implab/Parallels/BlockingQueue.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents 041b77711262
children d6fe09f5592c
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/BlockingQueue.cs	Fri Apr 22 13:10:34 2016 +0300
@@ -0,0 +1,101 @@
+using System;
+using System.Threading;
+
+namespace Implab.Parallels {
+    public class BlockingQueue<T> : AsyncQueue<T> {
+        readonly object m_lock = new object();
+
+        public override void Enqueue(T value) {
+            base.Enqueue(value);
+            lock (m_lock)
+                Monitor.Pulse(m_lock);
+        }
+
+        public override void EnqueueRange(T[] data, int offset, int length) {
+            base.EnqueueRange(data, offset, length);
+            if (length > 1)
+                lock (m_lock)
+                    Monitor.PulseAll(m_lock);
+            else
+                lock (m_lock)
+                    Monitor.Pulse(m_lock);
+        }
+
+        public T GetItem(int timeout) {
+            T item;
+
+            if (!TryDequeue(out item)) {
+                var t1 = Environment.TickCount;
+                var dt = timeout;
+
+                lock (m_lock) {
+                    while (!TryDequeue(out item)) {
+                        if (!Monitor.Wait(m_lock, dt))
+                            throw new TimeoutException();
+                        if (timeout >= 0) {
+                            dt = timeout - Environment.TickCount + t1;
+                            if (dt < 0)
+                                throw new TimeoutException();
+                        }
+                    }
+                }
+            }
+            return item;
+        }
+
+        public T GetItem() {
+            T item;
+            if (!TryDequeue(out item))
+                lock (m_lock) {
+                    while (!TryDequeue(out item))
+                        Monitor.Wait(m_lock);
+                }
+            return item;
+        }
+
+        public T[] GetRange(int max, int timeout) {
+            Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+
+            var buffer = new T[max];
+            int actual;
+            if (!TryDequeueRange(buffer, 0, max, out actual)) {
+                var t1 = Environment.TickCount;
+                var dt = timeout;
+
+                lock (m_lock) {
+                    while (!TryDequeueRange(buffer, 0, max, out actual)) {
+                
+                        if (!Monitor.Wait(m_lock, dt))
+                            throw new TimeoutException();
+
+                        if (timeout >= 0) {
+                            dt = timeout - Environment.TickCount + t1;
+                            if (dt < 0)
+                                throw new TimeoutException();
+                        }
+                    }
+                }
+            }
+
+            var data = new T[actual];
+            Array.Copy(buffer, data, actual);
+            return data;
+        }
+
+        public T[] GetRange(int max) {
+            Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+
+            var buffer = new T[max];
+            int actual;
+            if (!TryDequeueRange(buffer, 0, max, out actual))
+                lock (m_lock)
+                    while (!TryDequeueRange(buffer, 0, max, out actual))
+                        Monitor.Wait(m_lock);
+
+            var data = new T[actual];
+            Array.Copy(buffer, data, actual);
+            return data;
+        }
+    }
+}
+