changeset 137:238e15580926 v2

added the blocking queue
author cin
date Mon, 16 Feb 2015 17:48:39 +0300
parents e9e7940c7d98
children f75cfa58e3d4
files Implab/Implab.csproj Implab/Parallels/AsyncQueue.cs Implab/Parallels/BlockingQueue.cs
diffstat 3 files changed, 90 insertions(+), 2 deletions(-) [+]
line wrap: on
line diff
--- a/Implab/Implab.csproj	Mon Feb 16 01:14:09 2015 +0300
+++ b/Implab/Implab.csproj	Mon Feb 16 17:48:39 2015 +0300
@@ -155,6 +155,7 @@
     <Compile Include="Parallels\SharedLock.cs" />
     <Compile Include="Diagnostics\ILogWriter.cs" />
     <Compile Include="Diagnostics\ListenerBase.cs" />
+    <Compile Include="Parallels\BlockingQueue.cs" />
   </ItemGroup>
   <Import Project="$(MSBuildBinPath)\Microsoft.CSharp.targets" />
   <ItemGroup />
--- a/Implab/Parallels/AsyncQueue.cs	Mon Feb 16 01:14:09 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Mon Feb 16 17:48:39 2015 +0300
@@ -158,7 +158,7 @@
         /// Adds the specified value to the queue.
         /// </summary>
         /// <param name="value">Tha value which will be added to the queue.</param>
-        public void Enqueue(T value) {
+        public virtual void Enqueue(T value) {
             var last = m_last;
             // spin wait to the new chunk
             bool extend = true;
@@ -184,7 +184,7 @@
         /// <param name="data">The buffer which contains the data to be enqueued.</param>
         /// <param name="offset">The offset of the data in the buffer.</param>
         /// <param name="length">The size of the data to read from the buffer.</param>
-        public void EnqueueRange(T[] data, int offset, int length) {
+        public virtual void EnqueueRange(T[] data, int offset, int length) {
             if (data == null)
                 throw new ArgumentNullException("data");
             if (length == 0)
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/BlockingQueue.cs	Mon Feb 16 17:48:39 2015 +0300
@@ -0,0 +1,87 @@
+using System;
+using System.Threading;
+
+namespace Implab.Parallels {
+    public class BlockingQueue<T> : AsyncQueue<T> {
+        readonly object m_lock = new object();
+
+        public override void Enqueue(T value) {
+            base.Enqueue(value);
+            lock (m_lock)
+                Monitor.Pulse(m_lock);
+        }
+
+        public override void EnqueueRange(T[] data, int offset, int length) {
+            base.EnqueueRange(data, offset, length);
+            if (length > 1)
+                lock (m_lock)
+                    Monitor.PulseAll(m_lock);
+            else
+                lock (m_lock)
+                    Monitor.Pulse(m_lock);
+        }
+
+        public T GetItem(int timeout) {
+            T item;
+            var t1 = Environment.TickCount;
+            var dt = timeout;
+            while (!TryDequeue(out item)) {
+                lock (m_lock)
+                    if (!Monitor.Wait(m_lock, dt))
+                        throw new TimeoutException();
+                if (timeout >= 0) {
+                    dt = timeout - Environment.TickCount + t1;
+                    if (dt < 0)
+                        throw new TimeoutException();
+                }
+            }
+            return item;
+        }
+
+        public T GetItem() {
+            T item;
+            while (!TryDequeue(out item))
+                lock (m_lock)
+                    Monitor.Wait(m_lock);
+            return item;
+        }
+
+        public T[] GetRange(int max, int timeout) {
+            Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+
+            var buffer = new T[max];
+            int actual;
+            var t1 = Environment.TickCount;
+            var dt = timeout;
+            while (!TryDequeueRange(buffer,0,max,out actual)) {
+                lock (m_lock)
+                    if (!Monitor.Wait(m_lock, dt))
+                        throw new TimeoutException();
+                if (timeout >= 0) {
+                    dt = timeout - Environment.TickCount + t1;
+                    if (dt < 0)
+                        throw new TimeoutException();
+                }
+            }
+
+            var data = new T[actual];
+            Array.Copy(buffer, data, actual);
+            return data;
+        }
+
+        public T[] GetRange(int max) {
+            Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
+
+            var buffer = new T[max];
+            int actual;
+            while (!TryDequeueRange(buffer, 0, max, out actual))
+                lock (m_lock)
+                    Monitor.Wait(m_lock);
+
+            var data = new T[actual];
+            Array.Copy(buffer, data, actual);
+            return data;
+        }
+    }
+}
+