changeset 25:9bf5b23650c9

refactoring
author cin
date Thu, 06 Feb 2014 01:08:59 +0400
parents ee04e1fa78da
children f0bf98e4d22c
files Implab/IPromise.cs Implab/IPromiseBase.cs Implab/ITaskController.cs Implab/Implab.csproj Implab/Parallels/ArrayTraits.cs Implab/Parallels/AsyncPool.cs Implab/Promise.cs Implab/TaskController.cs
diffstat 8 files changed, 163 insertions(+), 95 deletions(-) [+]
line wrap: on
line diff
--- a/Implab/IPromise.cs	Thu Nov 14 01:15:07 2013 +0400
+++ b/Implab/IPromise.cs	Thu Feb 06 01:08:59 2014 +0400
@@ -5,15 +5,11 @@
 
 namespace Implab
 {
-    public interface IPromise: ICancellable
+    public interface IPromise<T>: IPromiseBase
     {
-        /// <summary>
-        /// Check whereather the promise has no more than one dependent promise.
-        /// </summary>
-        bool IsExclusive
-        {
-            get;
-        }
+        
+
+
 
 
     }
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Implab/IPromiseBase.cs	Thu Feb 06 01:08:59 2014 +0400
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Implab {
+    public interface IPromiseBase: ICancellable {
+        /// <summary>
+        /// Check whereather the promise has no more than one dependent promise.
+        /// </summary>
+        bool IsExclusive {
+            get;
+        }
+
+        bool IsResolved { get; }
+
+        bool IsCancelled { get; }
+    }
+}
--- a/Implab/ITaskController.cs	Thu Nov 14 01:15:07 2013 +0400
+++ b/Implab/ITaskController.cs	Thu Feb 06 01:08:59 2014 +0400
@@ -4,9 +4,11 @@
 using System.Text;
 
 namespace Implab {
-    public interface ITaskController: IProgressHandler {
-        bool Cancelled {
+    public interface ITaskController: IProgressHandler, ICancellable {
+        bool IsCancelled {
             get;
         }
+
+        event EventHandler Cancelled;
     }
 }
--- a/Implab/Implab.csproj	Thu Nov 14 01:15:07 2013 +0400
+++ b/Implab/Implab.csproj	Thu Feb 06 01:08:59 2014 +0400
@@ -36,6 +36,7 @@
     <Compile Include="IProgressHandler.cs" />
     <Compile Include="IProgressNotifier.cs" />
     <Compile Include="IPromise.cs" />
+    <Compile Include="IPromiseBase.cs" />
     <Compile Include="ITaskController.cs" />
     <Compile Include="ManagedPromise.cs" />
     <Compile Include="Parallels\DispatchPool.cs" />
--- a/Implab/Parallels/ArrayTraits.cs	Thu Nov 14 01:15:07 2013 +0400
+++ b/Implab/Parallels/ArrayTraits.cs	Thu Feb 06 01:08:59 2014 +0400
@@ -168,45 +168,5 @@
             return promise.Anyway(() => semaphore.Dispose());
         }
 
-        /*
-        this method is pretty fast, but it may cause a stack overflow if an element transformation is made faster then the next operation is
-        be chained, in this case the syncronous callback invocation will occur
-        
-        public static Promise<TDst[]> ChainedMap2<TSrc, TDst>(this TSrc[] source, ChainedOperation<TSrc, TDst> transform, int threads) {
-            if (source == null)
-                throw new ArgumentNullException("source");
-            if (transform == null)
-                throw new ArgumentNullException("transform");
-            if (threads <= 0)
-                throw new ArgumentOutOfRangeException("Threads number must be greater then zero");
-
-            var promise = new Promise<TDst[]>();
-            var res = new TDst[source.Length];
-            var index = -1; // we will start with increment
-            var len = source.Length;
-            var pending = len;
-
-            Action<int> callback = null;
-            callback = (current) => {
-                if (current < len) {
-                    transform(source[current])
-                        .Then(
-                            x => {
-                                res[current] = x;
-                                if (Interlocked.Decrement(ref pending) == 0)
-                                    promise.Resolve(res);
-                                else
-                                    callback(Interlocked.Increment(ref index));
-                            },
-                            e => promise.Reject(e)
-                        );
-                }
-            };
-
-            for (int i = 0; i < threads; i++)
-                callback(Interlocked.Increment(ref index));
-            return promise;
-        }
-         */
     }
 }
--- a/Implab/Parallels/AsyncPool.cs	Thu Nov 14 01:15:07 2013 +0400
+++ b/Implab/Parallels/AsyncPool.cs	Thu Feb 06 01:08:59 2014 +0400
@@ -1,28 +1,28 @@
-using System;
-using System.Threading;
-
-namespace Implab.Parallels {
-	/// <summary>
-	/// Класс для распаралеливания задач.
-	/// </summary>
-	/// <remarks>
-	/// Используя данный класс и лямда выражения можно распараллелить
-	/// вычисления, для этого используется концепция обещаний.
-	/// </remarks>
-	public static class AsyncPool {
-
-		public static Promise<T> Invoke<T>(Func<T> func) {
-			var p = new Promise<T>();
-
-			ThreadPool.QueueUserWorkItem(param => {
-				try {
+using System;
+using System.Threading;
+
+namespace Implab.Parallels {
+	/// <summary>
+	/// Класс для распаралеливания задач.
+	/// </summary>
+	/// <remarks>
+	/// Используя данный класс и лямда выражения можно распараллелить
+	/// вычисления, для этого используется концепция обещаний.
+	/// </remarks>
+	public static class AsyncPool {
+
+		public static Promise<T> Invoke<T>(Func<T> func) {
+			var p = new Promise<T>();
+
+			ThreadPool.QueueUserWorkItem(param => {
+				try {
 					p.Resolve(func());
-				} catch(Exception e) {
-					p.Reject(e);
-				}
-			});
-
-			return p;
+				} catch(Exception e) {
+					p.Reject(e);
+				}
+			});
+
+			return p;
 		}
 
         public static Promise<T> InvokeNewThread<T>(Func<T> func) {
@@ -39,6 +39,6 @@
             worker.Start();
 
             return p;
-        }
-	}
-}
+        }
+	}
+}
--- a/Implab/Promise.cs	Thu Nov 14 01:15:07 2013 +0400
+++ b/Implab/Promise.cs	Thu Feb 06 01:08:59 2014 +0400
@@ -47,7 +47,7 @@
     /// только инициатор обещания иначе могут возникнуть противоречия.
     /// </para>
     /// </remarks>
-    public class Promise<T> : IPromise {
+    public class Promise<T> : IPromise<T> {
 
         struct HandlerDescriptor {
             public ResultHandler<T> resultHandler;
@@ -82,11 +82,11 @@
 
         const int UnresolvedSate = 0;
         const int TransitionalState = 1;
-        const int ResolvedState = 2;
+        const int SucceededState = 2;
         const int RejectedState = 3;
         const int CancelledState = 4;
 
-        readonly IPromise m_parent;
+        readonly IPromiseBase m_parent;
         readonly bool m_cancellable;
 
         int m_childrenCount = 0;
@@ -100,7 +100,7 @@
             m_cancellable = true;
         }
 
-        public Promise(IPromise parent, bool cancellable) {
+        public Promise(IPromiseBase parent, bool cancellable) {
             m_cancellable = cancellable;
             m_parent = parent;
         }
@@ -119,6 +119,12 @@
                 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state");
         }
 
+        void WaitTransition() {
+            while (m_state == TransitionalState) {
+                /* noop */
+            }
+        }
+
         public bool IsResolved {
             get {
                 return m_state > 1;
@@ -139,10 +145,13 @@
         public void Resolve(T result) {
             if (BeginTransit()) {
                 m_result = result;
-                CompleteTransit(ResolvedState);
+                CompleteTransit(SucceededState);
                 OnStateChanged();
-            } else if (m_state != CancelledState)
-                throw new InvalidOperationException("The promise is already resolved");
+            } else {
+                WaitTransition();
+                if (m_state != CancelledState)
+                    throw new InvalidOperationException("The promise is already resolved");
+            }
         }
 
         /// <summary>
@@ -160,8 +169,11 @@
                 m_error = error;
                 CompleteTransit(RejectedState);
                 OnStateChanged();
-            } else if (m_state == ResolvedState)
-                throw new InvalidOperationException("The promise is already resolved");
+            } else {
+                WaitTransition();
+                if (m_state == SucceededState)
+                    throw new InvalidOperationException("The promise is already resolved");
+            }
         }
 
         /// <summary>
@@ -197,6 +209,9 @@
             ErrorHandler errorHandler;
             if (error != null)
                 errorHandler = x => {
+                    // несмотря на то, что обработчик ошибки вызывается безопасно,
+                    // т.е. возникшие в нем ошибки будут подавлены, нам нужно
+                    // гарантировать, что ошибка будет передана дальше по цепочке обещаний
                     try {
                         error(x);
                     } catch { }
@@ -238,8 +253,9 @@
                 errorHandler = x => {
                     try {
                         medium.Resolve(error(x));
-                    } catch { }
-                    medium.Reject(x);
+                    } catch(Exception e) {
+                        medium.Reject(e);
+                    }
                 };
             else
                 errorHandler = medium.Reject;
@@ -257,7 +273,7 @@
             var medium = new Promise<T>(this, true);
 
             ResultHandler<T> resultHandler;
-            
+
             if (success != null)
                 resultHandler = x => {
                     success(x);
@@ -430,6 +446,11 @@
             return this;
         }
 
+        /// <summary>
+        /// Adds the specified handler for all cases (success, error, cancel)
+        /// </summary>
+        /// <param name="handler">The handler that will be called anyway</param>
+        /// <returns>self</returns>
         public Promise<T> Finally(Action handler) {
             if (handler == null)
                 throw new ArgumentNullException("handler");
@@ -471,7 +492,7 @@
                 throw new TimeoutException();
 
             switch (m_state) {
-                case ResolvedState:
+                case SucceededState:
                     return m_result;
                 case CancelledState:
                     throw new OperationCanceledException();
@@ -517,7 +538,7 @@
 
         void InvokeHandler(HandlerDescriptor handler) {
             switch (m_state) {
-                case ResolvedState:
+                case SucceededState:
                     handler.Resolve(m_result);
                     break;
                 case RejectedState:
@@ -538,8 +559,6 @@
                 InvokeHandler(handler);
         }
 
-
-
         public bool IsExclusive {
             get {
                 return m_childrenCount <= 1;
@@ -560,5 +579,68 @@
             }
         }
 
+        /// <summary>
+        /// Объединяет несколько обещаний в одно, результатом которого является массив результатов других обещаний.
+        /// Если хотябы одно из переданных обещаний не будет выполнено, то новое обещение тоже не будет выполнено.
+        /// При отмене нового обещания, переданные обещания также будут отменены, если никто больше на них не подписан.
+        /// </summary>
+        /// <param name="promises">Список обещаний. Если список пустой, то результирующее обещание возвращается уже выполненным.</param>
+        /// <returns>Обещание объединяющее в себе результат переданных обещаний.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="promises"/> не может быть null</exception>
+        public static Promise<T[]> CreateComposite(IList<Promise<T>> promises) {
+            if (promises == null)
+                throw new ArgumentNullException();
+
+            // создаем аккумулятор для результатов и результирующее обещание
+            var result = new T[promises.Count];
+            var promise = new Promise<T[]>();
+
+            // special case
+            if (promises.Count == 0) {
+                promise.Resolve(result);
+                return promise;
+            }
+
+            int pending = promises.Count;
+
+            for (int i = 0; i < promises.Count; i++) {
+                var dest = i;
+
+                promises[i].Then(
+                    x => {
+                        result[dest] = x;
+                        if(Interlocked.Decrement(ref pending) == 0)
+                            promise.Resolve(result);
+                    },
+                    e => promise.Reject(e)
+                );
+            }
+
+            promise.Cancelled(
+                () => {
+                    foreach(var d in promises)
+                        if(d.IsExclusive)
+                            d.Cancel();
+                }
+            );
+
+            return promise;
+        }
+
+        public static Promise<T> ResultToPromise(T result) {
+            var p = new Promise<T>();
+            p.Resolve(result);
+            return p;
+        }
+
+        public static Promise<T> ExceptionToPromise(Exception error) {
+            if (error == null)
+                throw new ArgumentNullException();
+
+            var p = new Promise<T>();
+            p.Reject(error);
+            return p;
+        }
+
     }
 }
--- a/Implab/TaskController.cs	Thu Nov 14 01:15:07 2013 +0400
+++ b/Implab/TaskController.cs	Thu Feb 06 01:08:59 2014 +0400
@@ -12,7 +12,7 @@
     /// <remarks>
     /// Members of this object are thread safe.
     /// </remarks>
-    class TaskController: IProgressNotifier, ITaskController, ICancellable
+    public class TaskController: IProgressNotifier, ITaskController
     {
         readonly object m_lock;
         string m_message;
@@ -22,6 +22,7 @@
 
         bool m_cancelled;
 
+        public event EventHandler Cancelled;
         public event EventHandler<ValueEventArgs<string>> MessageUpdated;
         public event EventHandler<ValueEventArgs<float>> ProgressUpdated;
         public event EventHandler<ProgressInitEventArgs> ProgressInit;
@@ -84,7 +85,7 @@
             }
         }
 
-        public bool Cancelled {
+        public bool IsCancelled {
             get {
                 lock (m_lock)
                     return m_cancelled;
@@ -102,6 +103,13 @@
             }
         }
 
+        protected virtual void OnCancelled() {
+            var temp = Cancelled;
+            if (temp != null) {
+                temp(this,new EventArgs());
+            }
+        }
+
         protected virtual void OnMessageUpdated()
         {
             var temp = MessageUpdated;