annotate Implab/Parallels/BlockingQueue.cs @ 138:f75cfa58e3d4 v2

added ICancellable.Cancel(Exception) to allow specify the reason of cancellation
author cin
date Tue, 17 Feb 2015 18:16:26 +0300
parents 238e15580926
children 041b77711262
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
137
238e15580926 added the blocking queue
cin
parents:
diff changeset
1 using System;
238e15580926 added the blocking queue
cin
parents:
diff changeset
2 using System.Threading;
238e15580926 added the blocking queue
cin
parents:
diff changeset
3
238e15580926 added the blocking queue
cin
parents:
diff changeset
4 namespace Implab.Parallels {
238e15580926 added the blocking queue
cin
parents:
diff changeset
5 public class BlockingQueue<T> : AsyncQueue<T> {
238e15580926 added the blocking queue
cin
parents:
diff changeset
6 readonly object m_lock = new object();
238e15580926 added the blocking queue
cin
parents:
diff changeset
7
238e15580926 added the blocking queue
cin
parents:
diff changeset
8 public override void Enqueue(T value) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
9 base.Enqueue(value);
238e15580926 added the blocking queue
cin
parents:
diff changeset
10 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
11 Monitor.Pulse(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
12 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
13
238e15580926 added the blocking queue
cin
parents:
diff changeset
14 public override void EnqueueRange(T[] data, int offset, int length) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
15 base.EnqueueRange(data, offset, length);
238e15580926 added the blocking queue
cin
parents:
diff changeset
16 if (length > 1)
238e15580926 added the blocking queue
cin
parents:
diff changeset
17 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
18 Monitor.PulseAll(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
19 else
238e15580926 added the blocking queue
cin
parents:
diff changeset
20 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
21 Monitor.Pulse(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
22 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
23
238e15580926 added the blocking queue
cin
parents:
diff changeset
24 public T GetItem(int timeout) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
25 T item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
26 var t1 = Environment.TickCount;
238e15580926 added the blocking queue
cin
parents:
diff changeset
27 var dt = timeout;
238e15580926 added the blocking queue
cin
parents:
diff changeset
28 while (!TryDequeue(out item)) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
29 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
30 if (!Monitor.Wait(m_lock, dt))
238e15580926 added the blocking queue
cin
parents:
diff changeset
31 throw new TimeoutException();
238e15580926 added the blocking queue
cin
parents:
diff changeset
32 if (timeout >= 0) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
33 dt = timeout - Environment.TickCount + t1;
238e15580926 added the blocking queue
cin
parents:
diff changeset
34 if (dt < 0)
238e15580926 added the blocking queue
cin
parents:
diff changeset
35 throw new TimeoutException();
238e15580926 added the blocking queue
cin
parents:
diff changeset
36 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
37 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
38 return item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
39 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
40
238e15580926 added the blocking queue
cin
parents:
diff changeset
41 public T GetItem() {
238e15580926 added the blocking queue
cin
parents:
diff changeset
42 T item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
43 while (!TryDequeue(out item))
238e15580926 added the blocking queue
cin
parents:
diff changeset
44 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
45 Monitor.Wait(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
46 return item;
238e15580926 added the blocking queue
cin
parents:
diff changeset
47 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
48
238e15580926 added the blocking queue
cin
parents:
diff changeset
49 public T[] GetRange(int max, int timeout) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
50 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
238e15580926 added the blocking queue
cin
parents:
diff changeset
51
238e15580926 added the blocking queue
cin
parents:
diff changeset
52 var buffer = new T[max];
238e15580926 added the blocking queue
cin
parents:
diff changeset
53 int actual;
238e15580926 added the blocking queue
cin
parents:
diff changeset
54 var t1 = Environment.TickCount;
238e15580926 added the blocking queue
cin
parents:
diff changeset
55 var dt = timeout;
238e15580926 added the blocking queue
cin
parents:
diff changeset
56 while (!TryDequeueRange(buffer,0,max,out actual)) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
57 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
58 if (!Monitor.Wait(m_lock, dt))
238e15580926 added the blocking queue
cin
parents:
diff changeset
59 throw new TimeoutException();
238e15580926 added the blocking queue
cin
parents:
diff changeset
60 if (timeout >= 0) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
61 dt = timeout - Environment.TickCount + t1;
238e15580926 added the blocking queue
cin
parents:
diff changeset
62 if (dt < 0)
238e15580926 added the blocking queue
cin
parents:
diff changeset
63 throw new TimeoutException();
238e15580926 added the blocking queue
cin
parents:
diff changeset
64 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
65 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
66
238e15580926 added the blocking queue
cin
parents:
diff changeset
67 var data = new T[actual];
238e15580926 added the blocking queue
cin
parents:
diff changeset
68 Array.Copy(buffer, data, actual);
238e15580926 added the blocking queue
cin
parents:
diff changeset
69 return data;
238e15580926 added the blocking queue
cin
parents:
diff changeset
70 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
71
238e15580926 added the blocking queue
cin
parents:
diff changeset
72 public T[] GetRange(int max) {
238e15580926 added the blocking queue
cin
parents:
diff changeset
73 Safe.ArgumentInRange(max, 1, Int32.MaxValue, "max");
238e15580926 added the blocking queue
cin
parents:
diff changeset
74
238e15580926 added the blocking queue
cin
parents:
diff changeset
75 var buffer = new T[max];
238e15580926 added the blocking queue
cin
parents:
diff changeset
76 int actual;
238e15580926 added the blocking queue
cin
parents:
diff changeset
77 while (!TryDequeueRange(buffer, 0, max, out actual))
238e15580926 added the blocking queue
cin
parents:
diff changeset
78 lock (m_lock)
238e15580926 added the blocking queue
cin
parents:
diff changeset
79 Monitor.Wait(m_lock);
238e15580926 added the blocking queue
cin
parents:
diff changeset
80
238e15580926 added the blocking queue
cin
parents:
diff changeset
81 var data = new T[actual];
238e15580926 added the blocking queue
cin
parents:
diff changeset
82 Array.Copy(buffer, data, actual);
238e15580926 added the blocking queue
cin
parents:
diff changeset
83 return data;
238e15580926 added the blocking queue
cin
parents:
diff changeset
84 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
85 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
86 }
238e15580926 added the blocking queue
cin
parents:
diff changeset
87