137
|
1 using System;
|
|
2 using System.Threading;
|
|
3
|
|
4 namespace Implab.Parallels {
|
|
5 public class BlockingQueue<T> : AsyncQueue<T> {
|
|
6 readonly object m_lock = new object();
|
|
7
|
|
8 public override void Enqueue(T value) {
|
|
9 base.Enqueue(value);
|
|
10 lock (m_lock)
|
|
11 Monitor.Pulse(m_lock);
|
|
12 }
|
|
13
|
|
14 public override void EnqueueRange(T[] data, int offset, int length) {
|
|
15 base.EnqueueRange(data, offset, length);
|
|
16 if (length > 1)
|
|
17 lock (m_lock)
|
|
18 Monitor.PulseAll(m_lock);
|
|
19 else
|
|
20 lock (m_lock)
|
|
21 Monitor.Pulse(m_lock);
|
|
22 }
|
|
23
|
|
24 public T GetItem(int timeout) {
|
|
25 T item;
|
|
26 var t1 = Environment.TickCount;
|
|
27 var dt = timeout;
|
|
28 while (!TryDequeue(out item)) {
|
|
29 lock (m_lock)
|
|
30 if (!Monitor.Wait(m_lock, dt))
|
|
31 throw new TimeoutException();
|
|
32 if (timeout >= 0) {
|
|
33 dt = timeout - Environment.TickCount + t1;
|
|
34 if (dt < 0)
|
|
35 throw new TimeoutException();
|
|
36 }
|
|
37 }
|
|
38 return item;
|
|
39 }
|
|
40
|
|
41 public T GetItem() {
|
|
42 T item;
|
|
43 while (!TryDequeue(out item))
|
|
44 lock (m_lock)
|
|
45 Monitor.Wait(m_lock);
|
|
46 return item;
|
|
47 }
|
|
48
|
|
49 public T[] GetRange(int max, int timeout) {
|
|
50 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
|
|
51
|
|
52 var buffer = new T[max];
|
|
53 int actual;
|
|
54 var t1 = Environment.TickCount;
|
|
55 var dt = timeout;
|
|
56 while (!TryDequeueRange(buffer,0,max,out actual)) {
|
|
57 lock (m_lock)
|
|
58 if (!Monitor.Wait(m_lock, dt))
|
|
59 throw new TimeoutException();
|
|
60 if (timeout >= 0) {
|
|
61 dt = timeout - Environment.TickCount + t1;
|
|
62 if (dt < 0)
|
|
63 throw new TimeoutException();
|
|
64 }
|
|
65 }
|
|
66
|
|
67 var data = new T[actual];
|
|
68 Array.Copy(buffer, data, actual);
|
|
69 return data;
|
|
70 }
|
|
71
|
|
72 public T[] GetRange(int max) {
|
|
73 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
|
|
74
|
|
75 var buffer = new T[max];
|
|
76 int actual;
|
|
77 while (!TryDequeueRange(buffer, 0, max, out actual))
|
|
78 lock (m_lock)
|
|
79 Monitor.Wait(m_lock);
|
|
80
|
|
81 var data = new T[actual];
|
|
82 Array.Copy(buffer, data, actual);
|
|
83 return data;
|
|
84 }
|
|
85 }
|
|
86 }
|
|
87
|