annotate Implab/Parallels/BlockingQueue.cs @ 218:babe55c34931 v2

Trace logical operations in the correct channel
author cin
date Thu, 04 May 2017 02:17:53 +0300
parents 041b77711262
children d6fe09f5592c
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
1 using System;
238e15580926 added the blocking queue
cin
parents:
diff changeset
2 using System.Threading;
238e15580926 added the blocking queue
cin
parents:
diff changeset
3
238e15580926 added the blocking queue
cin
parents:
diff changeset
4 namespace Implab.Parallels {
238e15580926 added the blocking queue
cin
parents:
diff changeset
5 public class BlockingQueue<T> : AsyncQueue<T> {
238e15580926 added the blocking queue
cin
parents:
diff changeset
6 readonly object m_lock = new object();
238e15580926 added the blocking queue
cin
parents:
diff changeset
7
238e15580926 added the blocking queue
cin
parents:
diff changeset
8 public override void Enqueue(T value) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
9 base.Enqueue(value);
238e15580926 added the blocking queue
cin
parents:
diff changeset
10 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
11 Monitor.Pulse(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
12 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
13
238e15580926 added the blocking queue
cin
parents:
diff changeset
14 public override void EnqueueRange(T[] data, int offset, int length) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
15 base.EnqueueRange(data, offset, length);
238e15580926 added the blocking queue
cin
parents:
diff changeset
16 if (length > 1)
238e15580926 added the blocking queue
cin
parents:
diff changeset
17 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
18 Monitor.PulseAll(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
19 else
238e15580926 added the blocking queue
cin
parents:
diff changeset
20 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
21 Monitor.Pulse(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
22 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
23
238e15580926 added the blocking queue
cin
parents:
diff changeset
24 public T GetItem(int timeout) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
25 T item;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
26
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
27 if (!TryDequeue(out item)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
28 var t1 = Environment.TickCount;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
29 var dt = timeout;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
30
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
31 lock (m_lock) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
32 while (!TryDequeue(out item)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
33 if (!Monitor.Wait(m_lock, dt))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
34 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
35 if (timeout >= 0) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
36 dt = timeout - Environment.TickCount + t1;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
37 if (dt < 0)
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
38 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
39 }
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
40 }
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
41 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
42 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
43 return item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
44 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
45
238e15580926 added the blocking queue
cin
parents:
diff changeset
46 public T GetItem() {
238e15580926 added the blocking queue
cin
parents:
diff changeset
47 T item;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
48 if (!TryDequeue(out item))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
49 lock (m_lock) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
50 while (!TryDequeue(out item))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
51 Monitor.Wait(m_lock);
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
52 }
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
53 return item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
54 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
55
238e15580926 added the blocking queue
cin
parents:
diff changeset
56 public T[] GetRange(int max, int timeout) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
57 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
238e15580926 added the blocking queue
cin
parents:
diff changeset
58
238e15580926 added the blocking queue
cin
parents:
diff changeset
59 var buffer = new T[max];
238e15580926 added the blocking queue
cin
parents:
diff changeset
60 int actual;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
61 if (!TryDequeueRange(buffer, 0, max, out actual)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
62 var t1 = Environment.TickCount;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
63 var dt = timeout;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
64
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
65 lock (m_lock) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
66 while (!TryDequeueRange(buffer, 0, max, out actual)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
67
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
68 if (!Monitor.Wait(m_lock, dt))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
69 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
70
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
71 if (timeout >= 0) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
72 dt = timeout - Environment.TickCount + t1;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
73 if (dt < 0)
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
74 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
75 }
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
76 }
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
77 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
78 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
79
238e15580926 added the blocking queue
cin
parents:
diff changeset
80 var data = new T[actual];
238e15580926 added the blocking queue
cin
parents:
diff changeset
81 Array.Copy(buffer, data, actual);
238e15580926 added the blocking queue
cin
parents:
diff changeset
82 return data;
238e15580926 added the blocking queue
cin
parents:
diff changeset
83 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
84
238e15580926 added the blocking queue
cin
parents:
diff changeset
85 public T[] GetRange(int max) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
86 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
238e15580926 added the blocking queue
cin
parents:
diff changeset
87
238e15580926 added the blocking queue
cin
parents:
diff changeset
88 var buffer = new T[max];
238e15580926 added the blocking queue
cin
parents:
diff changeset
89 int actual;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
90 if (!TryDequeueRange(buffer, 0, max, out actual))
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
91 lock (m_lock)
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
92 while (!TryDequeueRange(buffer, 0, max, out actual))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
93 Monitor.Wait(m_lock);
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
94
238e15580926 added the blocking queue
cin
parents:
diff changeset
95 var data = new T[actual];
238e15580926 added the blocking queue
cin
parents:
diff changeset
96 Array.Copy(buffer, data, actual);
238e15580926 added the blocking queue
cin
parents:
diff changeset
97 return data;
238e15580926 added the blocking queue
cin
parents:
diff changeset
98 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
99 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
100 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
101