diff Implab/Parallels/WorkerPool.cs @ 149:eb793fbbe4ea v2

fixed promises cancellation
author cin
date Wed, 06 May 2015 17:11:27 +0300
parents 471f596b2603
children
line wrap: on
line diff
--- a/Implab/Parallels/WorkerPool.cs	Wed Apr 15 07:30:20 2015 +0300
+++ b/Implab/Parallels/WorkerPool.cs	Wed May 06 17:11:27 2015 +0300
@@ -30,7 +30,49 @@
             InitPool();
         }
 
-        public Promise<T> Invoke<T>(Func<T> task) {
+        public IPromise<T> Invoke<T>(Func<T> task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new FuncTask<T>(task, null, null, true);
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+
+                promise.Resolve();
+
+                TraceContext.Instance.Leave();
+            });
+
+            return promise;
+        }
+
+        public IPromise Invoke(Action task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new ActionTask(task, null, null, true);
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+
+                promise.Resolve();
+
+                TraceContext.Instance.Leave();
+            });
+
+            return promise;
+        }
+
+        public IPromise<T> Invoke<T>(Func<ICancellationToken, T> task) {
             if (task == null)
                 throw new ArgumentNullException("task");
             if (IsDisposed)
@@ -43,7 +85,35 @@
             EnqueueTask(delegate {
                 TraceContext.Instance.EnterLogicalOperation(lop, false);
                 try {
-                    promise.Resolve(task());
+                    if (!promise.CancelOperationIfRequested())
+                        promise.Resolve(task(promise));
+                } catch (Exception e) {
+                    promise.Reject(e);
+                } finally {
+                    TraceContext.Instance.Leave();
+                }
+            });
+
+            return promise;
+        }
+
+        public IPromise Invoke<T>(Action<ICancellationToken> task) {
+            if (task == null)
+                throw new ArgumentNullException("task");
+            if (IsDisposed)
+                throw new ObjectDisposedException(ToString());
+
+            var promise = new Promise();
+
+            var lop = TraceContext.Instance.CurrentOperation;
+
+            EnqueueTask(delegate {
+                TraceContext.Instance.EnterLogicalOperation(lop, false);
+                try {
+                    if (!promise.CancelOperationIfRequested()) {
+                        task(promise);
+                        promise.Resolve();
+                    }
                 } catch (Exception e) {
                     promise.Reject(e);
                 } finally {