comparison Implab/Parallels/BlockingQueue.cs @ 192:f1da3afc3521 release v2.1

Слияние с v2
author cin
date Fri, 22 Apr 2016 13:10:34 +0300
parents 041b77711262
children d6fe09f5592c
comparison
equal deleted inserted replaced
71:1714fd8678ef 192:f1da3afc3521
1 using System;
2 using System.Threading;
3
4 namespace Implab.Parallels {
5 public class BlockingQueue<T> : AsyncQueue<T> {
6 readonly object m_lock = new object();
7
8 public override void Enqueue(T value) {
9 base.Enqueue(value);
10 lock (m_lock)
11 Monitor.Pulse(m_lock);
12 }
13
14 public override void EnqueueRange(T[] data, int offset, int length) {
15 base.EnqueueRange(data, offset, length);
16 if (length > 1)
17 lock (m_lock)
18 Monitor.PulseAll(m_lock);
19 else
20 lock (m_lock)
21 Monitor.Pulse(m_lock);
22 }
23
24 public T GetItem(int timeout) {
25 T item;
26
27 if (!TryDequeue(out item)) {
28 var t1 = Environment.TickCount;
29 var dt = timeout;
30
31 lock (m_lock) {
32 while (!TryDequeue(out item)) {
33 if (!Monitor.Wait(m_lock, dt))
34 throw new TimeoutException();
35 if (timeout >= 0) {
36 dt = timeout - Environment.TickCount + t1;
37 if (dt < 0)
38 throw new TimeoutException();
39 }
40 }
41 }
42 }
43 return item;
44 }
45
46 public T GetItem() {
47 T item;
48 if (!TryDequeue(out item))
49 lock (m_lock) {
50 while (!TryDequeue(out item))
51 Monitor.Wait(m_lock);
52 }
53 return item;
54 }
55
56 public T[] GetRange(int max, int timeout) {
57 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
58
59 var buffer = new T[max];
60 int actual;
61 if (!TryDequeueRange(buffer, 0, max, out actual)) {
62 var t1 = Environment.TickCount;
63 var dt = timeout;
64
65 lock (m_lock) {
66 while (!TryDequeueRange(buffer, 0, max, out actual)) {
67
68 if (!Monitor.Wait(m_lock, dt))
69 throw new TimeoutException();
70
71 if (timeout >= 0) {
72 dt = timeout - Environment.TickCount + t1;
73 if (dt < 0)
74 throw new TimeoutException();
75 }
76 }
77 }
78 }
79
80 var data = new T[actual];
81 Array.Copy(buffer, data, actual);
82 return data;
83 }
84
85 public T[] GetRange(int max) {
86 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
87
88 var buffer = new T[max];
89 int actual;
90 if (!TryDequeueRange(buffer, 0, max, out actual))
91 lock (m_lock)
92 while (!TryDequeueRange(buffer, 0, max, out actual))
93 Monitor.Wait(m_lock);
94
95 var data = new T[actual];
96 Array.Copy(buffer, data, actual);
97 return data;
98 }
99 }
100 }
101