Mercurial > pub > ImplabNet
comparison Implab/Parallels/AsyncQueue.cs @ 120:f1b897999260 v2
improved asyncpool usability
working on batch operations on asyncqueue
author | cin |
---|---|
date | Mon, 12 Jan 2015 05:19:52 +0300 |
parents | 2573b562e328 |
children | 62d2f1e98c4e |
comparison
equal
deleted
inserted
replaced
119:2573b562e328 | 120:f1b897999260 |
---|---|
34 public int Hi { | 34 public int Hi { |
35 get { return m_hi; } | 35 get { return m_hi; } |
36 } | 36 } |
37 | 37 |
38 public bool TryEnqueue(T value,out bool extend) { | 38 public bool TryEnqueue(T value,out bool extend) { |
39 var alloc = Interlocked.Increment(ref m_alloc) - 1; | |
40 | |
41 if (alloc >= m_size) { | |
42 extend = alloc == m_size; | |
43 return false; | |
44 } | |
45 | |
39 extend = false; | 46 extend = false; |
40 int alloc; | |
41 do { | |
42 alloc = m_alloc; | |
43 if (alloc > m_size) | |
44 return false; | |
45 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc+1, alloc)); | |
46 | |
47 if (alloc == m_size) { | |
48 extend = true; | |
49 return false; | |
50 } | |
51 | |
52 m_data[alloc] = value; | 47 m_data[alloc] = value; |
53 | 48 |
54 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { | 49 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + 1, alloc)) { |
55 // spin wait for commit | 50 // spin wait for commit |
56 } | 51 } |
69 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); | 64 } while(low != Interlocked.CompareExchange(ref m_low, low + 1, low)); |
70 | 65 |
71 recycle = (low == m_size - 1); | 66 recycle = (low == m_size - 1); |
72 value = m_data[low]; | 67 value = m_data[low]; |
73 | 68 |
69 return true; | |
70 } | |
71 | |
72 public bool TryEnqueueBatch(T[] batch, int offset, int length, out int enqueued, out bool extend) { | |
73 int alloc; | |
74 int allocSize; | |
75 | |
76 do { | |
77 alloc = m_alloc; | |
78 | |
79 if (alloc > m_size) { | |
80 enqueued = 0; | |
81 extend = false; | |
82 return false; | |
83 } | |
84 | |
85 allocSize = Math.Min(m_size - m_alloc, length); | |
86 } while(alloc != Interlocked.CompareExchange(ref m_alloc, alloc + allocSize, alloc)); | |
87 | |
88 if (alloc == m_size) { | |
89 enqueued = 0; | |
90 extend = true; | |
91 return false; | |
92 } | |
93 | |
94 Array.Copy(batch, offset, m_data, alloc, allocSize); | |
95 enqueued = allocSize; | |
96 extend = false; | |
97 | |
98 while (alloc != Interlocked.CompareExchange(ref m_hi, alloc + allocSize, alloc)) { | |
99 // spin wait for commit | |
100 } | |
74 return true; | 101 return true; |
75 } | 102 } |
76 | 103 |
77 public T GetAt(int pos) { | 104 public T GetAt(int pos) { |
78 return m_data[pos]; | 105 return m_data[pos]; |