changeset 123:f4d6ea6969cc v2

async queue improvements
author cin
date Tue, 13 Jan 2015 01:42:38 +0300 (2015-01-12)
parents 0c8685c8b56b
children a336cb13c6a9
files Implab/Parallels/AsyncQueue.cs
diffstat 1 files changed, 113 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- a/Implab/Parallels/AsyncQueue.cs	Mon Jan 12 22:20:45 2015 +0300
+++ b/Implab/Parallels/AsyncQueue.cs	Tue Jan 13 01:42:38 2015 +0300
@@ -386,6 +386,119 @@
 
         }
 
+        public void Clear() {
+            // start the new queue
+            var t = new Chunk(m_chunkSize);
+            Thread.MemoryBarrier();
+            m_last = t;
+            Thread.MemoryBarrier();
+
+            // make the new queue available to the readers, and stop the old one
+            m_first = t;
+            Thread.MemoryBarrier();
+        }
+
+        public T[] Drain() {
+            // start the new queue
+            var t = new Chunk(m_chunkSize);
+            Thread.MemoryBarrier();
+            m_last = t;
+            Thread.MemoryBarrier();
+
+            // make the new queue available to the readers, and stop the old one
+            Chunk first;
+
+            do {
+                first = m_first;
+            } while(first != Interlocked.CompareExchange(ref m_first
+            Thread.MemoryBarrier();
+
+
+        }
+            
+        T[] ReadChunks(Chunk chunk) {
+            var result = new List<T>();
+            var buffer = new T[m_chunkSize];
+            int actual;
+            bool recycle;
+            while (chunk != null) {
+                // we need to read the chunk using this way
+                // since some client still may completing the dequeue
+                // operation, such clients most likely won't get results
+                while (chunk.TryDequeueBatch(buffer, 0, buffer.Length, out actual, out recycle))
+                    result.AddRange(new ArraySegmentCollection(buffer, 0, actual));
+
+                chunk = chunk.next;
+            }
+
+            return result.ToArray();
+        }
+
+        struct ArraySegmentCollection : ICollection<T> {
+            readonly T[] m_data;
+            readonly int m_offset;
+            readonly int m_length;
+
+            public ArraySegmentCollection(T[] data, int offset, int length) {
+                m_data = data;
+                m_offset = offset;
+                m_length = length;
+            }
+
+            #region ICollection implementation
+
+            public void Add(T item) {
+                throw new InvalidOperationException();
+            }
+
+            public void Clear() {
+                throw new InvalidOperationException();
+            }
+
+            public bool Contains(T item) {
+                return false;
+            }
+
+            public void CopyTo(T[] array, int arrayIndex) {
+                Array.Copy(m_data,m_offset,array,arrayIndex, m_length);
+            }
+
+            public bool Remove(T item) {
+                throw new NotImplementedException();
+            }
+
+            public int Count {
+                get {
+                    return m_length;
+                }
+            }
+
+            public bool IsReadOnly {
+                get {
+                    return true;
+                }
+            }
+
+            #endregion
+
+            #region IEnumerable implementation
+
+            public IEnumerator<T> GetEnumerator() {
+                for (int i = m_offset; i < m_length + m_offset; i++)
+                    yield return m_data[i];
+            }
+
+            #endregion
+
+            #region IEnumerable implementation
+
+            IEnumerator IEnumerable.GetEnumerator() {
+                return GetEnumerator();
+            }
+
+            #endregion
+        }
+
         #region IEnumerable implementation
 
         class Enumerator : IEnumerator<T> {