diff Implab/Parallels/ArrayTraits.cs @ 80:4f20870d0816 v2

added memory barriers
author cin
date Fri, 26 Sep 2014 03:32:34 +0400
parents c761fc982e1d
children ce0171cacec4
line wrap: on
line diff
--- a/Implab/Parallels/ArrayTraits.cs	Mon Sep 22 18:20:49 2014 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Fri Sep 26 03:32:34 2014 +0400
@@ -153,7 +153,8 @@
             var res = new TDst[source.Length];
             var pending = source.Length;
 
-            var semaphore = new Semaphore(threads, threads);
+            object locker = new object();
+            int slots = threads;
 
             // Analysis disable AccessToDisposedClosure
             AsyncPool.InvokeNewThread(() => {
@@ -162,22 +163,28 @@
                         break; // stop processing in case of error or cancellation
                     var idx = i;
 
-                    semaphore.WaitOne();
+                    lock(locker) {
+                        while(slots == 0)
+                            Monitor.Wait(locker);
+                        slots--;
+                    }
                     try {
-                        var p1 = transform(source[i]);
-                        p1.Anyway(() => semaphore.Release());
-                        p1.Then(
-                            x => {
-                                res[idx] = x;
-                                var left = Interlocked.Decrement(ref pending);
-                                if (left == 0)
-                                    promise.Resolve(res);
-                            },
-                            e => {
-                                promise.Reject(e);
-                                throw new TransientPromiseException(e);
-                            }
-                        );
+                        transform(source[i])
+                            .Anyway(() => {
+                                lock(locker) {
+                                    slots ++;
+                                    Monitor.Pulse(locker);
+                                }
+                            })
+                            .Last(
+                                x => {
+                                    res[idx] = x;
+                                    var left = Interlocked.Decrement(ref pending);
+                                    if (left == 0)
+                                        promise.Resolve(res);
+                                },
+                                e => promise.Reject(e)
+                            );
 
                     } catch (Exception e) {
                         promise.Reject(e);
@@ -186,7 +193,7 @@
                 return 0;
             });
 
-            return promise.Anyway(semaphore.Dispose);
+            return promise;
         }
 
     }