annotate Implab/Parallels/BlockingQueue.cs @ 187:dd4a3590f9c6 ref20160224

Reworked cancelation handling, if the cancel handler isn't specified the OperationCanceledException will be handled by the error handler Any unhandled OperationCanceledException will cause the promise cancelation
author cin
date Tue, 19 Apr 2016 17:35:20 +0300
parents 041b77711262
children d6fe09f5592c
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
1 using System;
238e15580926 added the blocking queue
cin
parents:
diff changeset
2 using System.Threading;
238e15580926 added the blocking queue
cin
parents:
diff changeset
3
238e15580926 added the blocking queue
cin
parents:
diff changeset
4 namespace Implab.Parallels {
238e15580926 added the blocking queue
cin
parents:
diff changeset
5 public class BlockingQueue<T> : AsyncQueue<T> {
238e15580926 added the blocking queue
cin
parents:
diff changeset
6 readonly object m_lock = new object();
238e15580926 added the blocking queue
cin
parents:
diff changeset
7
238e15580926 added the blocking queue
cin
parents:
diff changeset
8 public override void Enqueue(T value) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
9 base.Enqueue(value);
238e15580926 added the blocking queue
cin
parents:
diff changeset
10 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
11 Monitor.Pulse(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
12 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
13
238e15580926 added the blocking queue
cin
parents:
diff changeset
14 public override void EnqueueRange(T[] data, int offset, int length) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
15 base.EnqueueRange(data, offset, length);
238e15580926 added the blocking queue
cin
parents:
diff changeset
16 if (length > 1)
238e15580926 added the blocking queue
cin
parents:
diff changeset
17 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
18 Monitor.PulseAll(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
19 else
238e15580926 added the blocking queue
cin
parents:
diff changeset
20 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
21 Monitor.Pulse(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
22 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
23
238e15580926 added the blocking queue
cin
parents:
diff changeset
24 public T GetItem(int timeout) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
25 T item;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
26
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
27 if (!TryDequeue(out item)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
28 var t1 = Environment.TickCount;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
29 var dt = timeout;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
30
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
31 lock (m_lock) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
32 while (!TryDequeue(out item)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
33 if (!Monitor.Wait(m_lock, dt))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
34 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
35 if (timeout >= 0) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
36 dt = timeout - Environment.TickCount + t1;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
37 if (dt < 0)
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
38 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
39 }
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
40 }
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
41 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
42 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
43 return item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
44 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
45
238e15580926 added the blocking queue
cin
parents:
diff changeset
46 public T GetItem() {
238e15580926 added the blocking queue
cin
parents:
diff changeset
47 T item;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
48 if (!TryDequeue(out item))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
49 lock (m_lock) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
50 while (!TryDequeue(out item))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
51 Monitor.Wait(m_lock);
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
52 }
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
53 return item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
54 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
55
238e15580926 added the blocking queue
cin
parents:
diff changeset
56 public T[] GetRange(int max, int timeout) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
57 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
238e15580926 added the blocking queue
cin
parents:
diff changeset
58
238e15580926 added the blocking queue
cin
parents:
diff changeset
59 var buffer = new T[max];
238e15580926 added the blocking queue
cin
parents:
diff changeset
60 int actual;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
61 if (!TryDequeueRange(buffer, 0, max, out actual)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
62 var t1 = Environment.TickCount;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
63 var dt = timeout;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
64
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
65 lock (m_lock) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
66 while (!TryDequeueRange(buffer, 0, max, out actual)) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
67
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
68 if (!Monitor.Wait(m_lock, dt))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
69 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
70
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
71 if (timeout >= 0) {
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
72 dt = timeout - Environment.TickCount + t1;
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
73 if (dt < 0)
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
74 throw new TimeoutException();
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
75 }
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
76 }
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
77 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
78 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
79
238e15580926 added the blocking queue
cin
parents:
diff changeset
80 var data = new T[actual];
238e15580926 added the blocking queue
cin
parents:
diff changeset
81 Array.Copy(buffer, data, actual);
238e15580926 added the blocking queue
cin
parents:
diff changeset
82 return data;
238e15580926 added the blocking queue
cin
parents:
diff changeset
83 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
84
238e15580926 added the blocking queue
cin
parents:
diff changeset
85 public T[] GetRange(int max) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
86 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
238e15580926 added the blocking queue
cin
parents:
diff changeset
87
238e15580926 added the blocking queue
cin
parents:
diff changeset
88 var buffer = new T[max];
238e15580926 added the blocking queue
cin
parents:
diff changeset
89 int actual;
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
90 if (!TryDequeueRange(buffer, 0, max, out actual))
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
91 lock (m_lock)
139
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
92 while (!TryDequeueRange(buffer, 0, max, out actual))
041b77711262 fixed blocking queue
cin
parents: 137
diff changeset
93 Monitor.Wait(m_lock);
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
94
238e15580926 added the blocking queue
cin
parents:
diff changeset
95 var data = new T[actual];
238e15580926 added the blocking queue
cin
parents:
diff changeset
96 Array.Copy(buffer, data, actual);
238e15580926 added the blocking queue
cin
parents:
diff changeset
97 return data;
238e15580926 added the blocking queue
cin
parents:
diff changeset
98 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
99 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
100 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
101