comparison Implab/Parallels/AsyncQueue.cs @ 120:f1b897999260 v2

improved asyncpool usability working on batch operations on asyncqueue
author cin
date Mon, 12 Jan 2015 05:19:52 +0300
parents 2573b562e328
children 62d2f1e98c4e
comparison
equal deleted inserted replaced
119:2573b562e328 120:f1b897999260
34 public int Hi { 34 public int Hi {
35 get { return m_hi; } 35 get { return m_hi; }
36 } 36 }
37 37
38 public bool TryEnqueue(T value,out bool extend) { 38 public bool TryEnqueue(T value,out bool extend) {
39 var alloc = Interlocked.Increment(ref m_alloc) - 1;
40
41 if (alloc >= m_size) {
42 extend = alloc == m_size;
43 return false;
44 }
45
39 extend = false; 46 extend = false;
40 int alloc;
41 do {
42 alloc = m_alloc;
43 if (alloc > m_size)
44 return false;
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc));
46
47 if (alloc == m_size) {
48 extend = true;
49 return false;
50 }
51
52 m_data[alloc] = value; 47 m_data[alloc] = value;
53 48
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { 49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) {
55 // spin wait for commit 50 // spin wait for commit
56 } 51 }
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); 64 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low));
70 65
71 recycle = (low == m_size - 1); 66 recycle = (low == m_size - 1);
72 value = m_data[low]; 67 value = m_data[low];
73 68
69 return true;
70 }
71
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) {
73 int alloc;
74 int allocSize;
75
76 do {
77 alloc = m_alloc;
78
79 if (alloc > m_size) {
80 enqueued = 0;
81 extend = false;
82 return false;
83 }
84
85 allocSize = Math.Min(m_size - m_alloc, length);
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc));
87
88 if (alloc == m_size) {
89 enqueued = 0;
90 extend = true;
91 return false;
92 }
93
94 Array.Copy(batch, offset, m_data, alloc, allocSize);
95 enqueued = allocSize;
96 extend = false;
97
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) {
99 // spin wait for commit
100 }
74 return true; 101 return true;
75 } 102 }
76 103
77 public T GetAt(int pos) { 104 public T GetAt(int pos) {
78 return m_data[pos]; 105 return m_data[pos];