137
|
1 using System;
|
|
2 using System.Threading;
|
|
3
|
|
4 namespace Implab.Parallels {
|
|
5 public class BlockingQueue<T> : AsyncQueue<T> {
|
|
6 readonly object m_lock = new object();
|
|
7
|
233
|
8 public void EnqueuePulse(T value) {
|
137
|
9 base.Enqueue(value);
|
|
10 lock (m_lock)
|
|
11 Monitor.Pulse(m_lock);
|
|
12 }
|
|
13
|
233
|
14 public void EnqueueRangePulse(T[] data, int offset, int length) {
|
137
|
15 base.EnqueueRange(data, offset, length);
|
|
16 if (length > 1)
|
|
17 lock (m_lock)
|
|
18 Monitor.PulseAll(m_lock);
|
|
19 else
|
|
20 lock (m_lock)
|
|
21 Monitor.Pulse(m_lock);
|
|
22 }
|
|
23
|
|
24 public T GetItem(int timeout) {
|
|
25 T item;
|
139
|
26
|
|
27 if (!TryDequeue(out item)) {
|
|
28 var t1 = Environment.TickCount;
|
|
29 var dt = timeout;
|
|
30
|
|
31 lock (m_lock) {
|
|
32 while (!TryDequeue(out item)) {
|
|
33 if (!Monitor.Wait(m_lock, dt))
|
|
34 throw new TimeoutException();
|
|
35 if (timeout >= 0) {
|
|
36 dt = timeout - Environment.TickCount + t1;
|
|
37 if (dt < 0)
|
|
38 throw new TimeoutException();
|
|
39 }
|
|
40 }
|
137
|
41 }
|
|
42 }
|
|
43 return item;
|
|
44 }
|
|
45
|
|
46 public T GetItem() {
|
|
47 T item;
|
139
|
48 if (!TryDequeue(out item))
|
|
49 lock (m_lock) {
|
|
50 while (!TryDequeue(out item))
|
|
51 Monitor.Wait(m_lock);
|
|
52 }
|
137
|
53 return item;
|
|
54 }
|
|
55
|
|
56 public T[] GetRange(int max, int timeout) {
|
251
|
57 Safe.ArgumentInRange(max > 0 , nameof(max));
|
137
|
58
|
|
59 var buffer = new T[max];
|
|
60 int actual;
|
139
|
61 if (!TryDequeueRange(buffer, 0, max, out actual)) {
|
|
62 var t1 = Environment.TickCount;
|
|
63 var dt = timeout;
|
|
64
|
|
65 lock (m_lock) {
|
|
66 while (!TryDequeueRange(buffer, 0, max, out actual)) {
|
|
67
|
|
68 if (!Monitor.Wait(m_lock, dt))
|
|
69 throw new TimeoutException();
|
|
70
|
|
71 if (timeout >= 0) {
|
|
72 dt = timeout - Environment.TickCount + t1;
|
|
73 if (dt < 0)
|
|
74 throw new TimeoutException();
|
|
75 }
|
|
76 }
|
137
|
77 }
|
|
78 }
|
|
79
|
|
80 var data = new T[actual];
|
|
81 Array.Copy(buffer, data, actual);
|
|
82 return data;
|
|
83 }
|
|
84
|
|
85 public T[] GetRange(int max) {
|
251
|
86 Safe.ArgumentInRange(max > 0, nameof(max));
|
137
|
87
|
|
88 var buffer = new T[max];
|
|
89 int actual;
|
139
|
90 if (!TryDequeueRange(buffer, 0, max, out actual))
|
137
|
91 lock (m_lock)
|
139
|
92 while (!TryDequeueRange(buffer, 0, max, out actual))
|
|
93 Monitor.Wait(m_lock);
|
137
|
94
|
|
95 var data = new T[actual];
|
|
96 Array.Copy(buffer, data, actual);
|
|
97 return data;
|
|
98 }
|
|
99 }
|
|
100 }
|
|
101
|