comparison Implab/Parallels/BlockingQueue.cs @ 137:238e15580926 v2

added the blocking queue
author cin
date Mon, 16 Feb 2015 17:48:39 +0300
parents
children 041b77711262
comparison
equal deleted inserted replaced
136:e9e7940c7d98 137:238e15580926
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 public class BlockingQueue<T> : AsyncQueue<T> {
6 readonly object m_lock = new object();
7
8 public override void Enqueue(T value) {
9 base.Enqueue(value);
10 lock (m_lock)
11 Monitor.Pulse(m_lock);
12 }
13
14 public override void EnqueueRange(T[] data, int offset, int length) {
15 base.EnqueueRange(data, offset, length);
16 if (length > 1)
17 lock (m_lock)
18 Monitor.PulseAll(m_lock);
19 else
20 lock (m_lock)
21 Monitor.Pulse(m_lock);
22 }
23
24 public T GetItem(int timeout) {
25 T item;
26 var t1 = Environment.TickCount;
27 var dt = timeout;
28 while (!TryDequeue(out item)) {
29 lock (m_lock)
30 if (!Monitor.Wait(m_lock, dt))
31 throw new TimeoutException();
32 if (timeout >= 0) {
33 dt = timeout - Environment.TickCount + t1;
34 if (dt < 0)
35 throw new TimeoutException();
36 }
37 }
38 return item;
39 }
40
41 public T GetItem() {
42 T item;
43 while (!TryDequeue(out item))
44 lock (m_lock)
45 Monitor.Wait(m_lock);
46 return item;
47 }
48
49 public T[] GetRange(int max, int timeout) {
50 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
51
52 var buffer = new T[max];
53 int actual;
54 var t1 = Environment.TickCount;
55 var dt = timeout;
56 while (!TryDequeueRange(buffer,0,max,out actual)) {
57 lock (m_lock)
58 if (!Monitor.Wait(m_lock, dt))
59 throw new TimeoutException();
60 if (timeout >= 0) {
61 dt = timeout - Environment.TickCount + t1;
62 if (dt < 0)
63 throw new TimeoutException();
64 }
65 }
66
67 var data = new T[actual];
68 Array.Copy(buffer, data, actual);
69 return data;
70 }
71
72 public T[] GetRange(int max) {
73 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
74
75 var buffer = new T[max];
76 int actual;
77 while (!TryDequeueRange(buffer, 0, max, out actual))
78 lock (m_lock)
79 Monitor.Wait(m_lock);
80
81 var data = new T[actual];
82 Array.Copy(buffer, data, actual);
83 return data;
84 }
85 }
86 }
87