diff Implab/Parallels/SimpleAsyncQueue.cs @ 233:d6fe09f5592c v2

Improved AsyncQueue Removed ImplabFx
author cin
date Wed, 04 Oct 2017 15:44:47 +0300
parents
children cbe10ac0731e
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/Parallels/SimpleAsyncQueue.cs	Wed Oct 04 15:44:47 2017 +0300
@@ -0,0 +1,131 @@
+using System.Threading;
+using System.Collections.Generic;
+using System;
+using System.Collections;
+
+namespace Implab.Parallels {
+    public class SimpleAsyncQueue<T> : IEnumerable<T> {
+        class Node {
+            public Node(T value) {
+                this.value = value;
+            }
+            public readonly T value;
+            public volatile Node next;
+        }
+
+        // the reader and the writer are mainteined completely independent,
+        // the reader can read next item when m_first.next is not null
+        // the writer creates the a new node, moves m_last to this node and
+        // only after that restores the reference from the previous node
+        // making available the reader to read the new node.
+
+        Node m_first; // position on the node which is already read
+        Node m_last; // position on the node which is already written
+
+        public SimpleAsyncQueue() {
+            m_first = m_last = new Node(default(T));
+        }
+
+        public void Enqueue(T value) {
+            var next = new Node(value);
+
+            // Interlocaked.CompareExchange implies Thread.MemoryBarrier();
+            // to ensure that the next node is completely constructed
+            var last = Interlocked.Exchange(ref m_last, next);
+
+            // release-fence
+            last.next = next;
+            
+        }
+
+        public bool TryDequeue(out T value) {
+            Node first;
+            Node next;
+
+            Thread.MemoryBarrier(); // ensure m_first is fresh
+            SpinWait spin = new SpinWait();
+            do {
+                first = m_first;
+                // aquire-fence
+                next = first.next;
+                if (next == null) {
+                    value = default(T);
+                    return false;
+                }
+                
+                if (first == Interlocked.CompareExchange(ref m_first, next, first))
+                    // head succesfully updated
+                    break;
+                spin.SpinOnce();
+            } while (true);
+
+            value = next.value;
+            return true;
+        }
+
+        #region IEnumerable implementation
+
+        class Enumerator : IEnumerator<T> {
+            Node m_current;
+            Node m_first;
+
+            public Enumerator(Node first) {
+                m_first = first;
+            }
+
+            #region IEnumerator implementation
+
+            public bool MoveNext() {
+                m_current = m_current == null ? m_first : m_current.next;
+                return m_current != null;
+            }
+
+            public void Reset() {
+                m_current = null;
+            }
+
+            object IEnumerator.Current {
+                get {
+                    if (m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current.value;
+                }
+            }
+
+            #endregion
+
+            #region IDisposable implementation
+
+            public void Dispose() {
+            }
+
+            #endregion
+
+            #region IEnumerator implementation
+
+            public T Current {
+                get {
+                    if (m_current == null)
+                        throw new InvalidOperationException();
+                    return m_current.value;
+                }
+            }
+
+            #endregion
+        }
+
+        public IEnumerator<T> GetEnumerator() {
+            return new Enumerator(m_first);
+        }
+
+        #endregion
+
+        #region IEnumerable implementation
+
+        IEnumerator IEnumerable.GetEnumerator() {
+            return GetEnumerator();
+        }
+
+        #endregion
+    }
+}