Mercurial > pub > ImplabNet
annotate Implab/AbstractEvent.cs @ 242:cbe10ac0731e v3
Working on promises
| author | cin | 
|---|---|
| date | Wed, 24 Jan 2018 03:03:21 +0300 | 
| parents | fa6cbf4d8841 | 
| children | b1e0ffdf3451 | 
| rev | line source | 
|---|---|
| 144 | 1 using System; | 
| 2 using Implab.Parallels; | |
| 3 using System.Threading; | |
| 4 using System.Reflection; | |
| 242 | 5 using System.Diagnostics; | 
| 144 | 6 | 
| 7 namespace Implab { | |
| 242 | 8 public abstract class AbstractEvent<THandler> where THandler : class { | 
| 144 | 9 | 
| 242 | 10 const int PENDING_SATE = 0; | 
| 11 protected const int TRANSITIONAL_STATE = 1; | |
| 12 | |
| 156 
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
 cin parents: 
148diff
changeset | 13 protected const int SUCCEEDED_STATE = 2; | 
| 
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
 cin parents: 
148diff
changeset | 14 protected const int REJECTED_STATE = 3; | 
| 144 | 15 | 
| 242 | 16 volatile int m_state; | 
| 144 | 17 Exception m_error; | 
| 18 | |
| 242 | 19 THandler m_handler; | 
| 233 | 20 SimpleAsyncQueue<THandler> m_extraHandlers; | 
| 144 | 21 | 
| 22 #region state managment | |
| 242 | 23 protected bool BeginTransit() { | 
| 24 return PENDING_SATE == Interlocked.CompareExchange(ref m_state, TRANSITIONAL_STATE, PENDING_SATE); | |
| 144 | 25 } | 
| 26 | |
| 242 | 27 protected void CompleteTransit(int state) { | 
| 28 #if DEBUG | |
| 144 | 29 if (TRANSITIONAL_STATE != Interlocked.CompareExchange(ref m_state, state, TRANSITIONAL_STATE)) | 
| 30 throw new InvalidOperationException("Can't complete transition when the object isn't in the transitional state"); | |
| 242 | 31 #else | 
| 32 m_state = state; | |
| 33 #endif | |
| 34 Signal(); | |
| 144 | 35 } | 
| 36 | |
| 242 | 37 protected void WaitTransition() { | 
| 38 if (m_state == TRANSITIONAL_STATE) { | |
| 39 SpinWait spin; | |
| 40 do { | |
| 41 spin.SpinOnce(); | |
| 42 } while (m_state == TRANSITIONAL_STATE); | |
| 144 | 43 } | 
| 44 } | |
| 45 | |
| 46 protected bool BeginSetResult() { | |
| 47 if (!BeginTransit()) { | |
| 48 WaitTransition(); | |
| 49 return false; | |
| 50 } | |
| 51 return true; | |
| 52 } | |
| 53 | |
| 54 protected void EndSetResult() { | |
| 55 CompleteTransit(SUCCEEDED_STATE); | |
| 56 } | |
| 57 | |
| 58 | |
| 59 | |
| 60 /// <summary> | |
| 61 /// Выполняет обещание, сообщая об ошибке | |
| 62 /// </summary> | |
| 63 /// <remarks> | |
| 64 /// Поскольку обещание должно работать в многопточной среде, при его выполнении сразу несколько потоков | |
| 65 /// могу вернуть ошибку, при этом только первая будет использована в качестве результата, остальные | |
| 66 /// будут проигнорированы. | |
| 67 /// </remarks> | |
| 68 /// <param name="error">Исключение возникшее при выполнении операции</param> | |
| 69 /// <exception cref="InvalidOperationException">Данное обещание уже выполнено</exception> | |
| 70 protected void SetError(Exception error) { | |
| 71 if (BeginTransit()) { | |
| 186 | 72 m_error = error; | 
| 73 CompleteTransit(REJECTED_STATE); | |
| 144 | 74 } else { | 
| 75 WaitTransition(); | |
| 186 | 76 if (m_state == SUCCEEDED_STATE) | 
| 144 | 77 throw new InvalidOperationException("The promise is already resolved"); | 
| 78 } | |
| 79 } | |
| 80 | |
| 156 
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
 cin parents: 
148diff
changeset | 81 protected abstract void SignalHandler(THandler handler, int signal); | 
| 144 | 82 | 
| 156 
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
 cin parents: 
148diff
changeset | 83 void Signal() { | 
| 242 | 84 THandler handler; | 
| 85 while (TryDequeueHandler(out handler)) | |
| 86 SignalHandler(handler, m_state); | |
| 144 | 87 } | 
| 88 | |
| 89 #endregion | |
| 90 | |
| 242 | 91 protected abstract Signal GetFulfillSignal(); | 
| 144 | 92 | 
| 93 #region synchronization traits | |
| 94 protected void WaitResult(int timeout) { | |
| 242 | 95 if (!(IsFulfilled || GetFulfillSignal().Wait(timeout))) | 
| 148 | 96 throw new TimeoutException(); | 
| 144 | 97 | 
| 242 | 98 if (IsRejected) | 
| 99 Rethrow(); | |
| 100 } | |
| 101 | |
| 102 protected void Rethrow() { | |
| 103 Debug.Assert(m_error != null); | |
| 104 if (m_error is OperationCanceledException) | |
| 105 throw new OperationCanceledException("Operation cancelled", m_error); | |
| 106 else | |
| 107 throw new TargetInvocationException(m_error); | |
| 144 | 108 } | 
| 109 #endregion | |
| 110 | |
| 111 #region handlers managment | |
| 112 | |
| 113 protected void AddHandler(THandler handler) { | |
| 114 | |
| 115 if (m_state > 1) { | |
| 116 // the promise is in the resolved state, just invoke the handler | |
| 156 
97fbbf816844
Promises: SignalXXX methods merged into SignalHandler method.
 cin parents: 
148diff
changeset | 117 SignalHandler(handler, m_state); | 
| 144 | 118 } else { | 
| 242 | 119 if (Interlocked.CompareExchange(ref m_handler, handler, null) != null) { | 
| 120 if (m_extraHandlers == null) | |
| 121 // compare-exchange will fprotect from loosing already created queue | |
| 122 Interlocked.CompareExchange(ref m_extraHandlers, new SimpleAsyncQueue<THandler>(), null); | |
| 123 m_extraHandlers.Enqueue(handler); | |
| 124 } | |
| 144 | 125 | 
| 242 | 126 if (m_state > 1 && TryDequeueHandler(out handler)) | 
| 144 | 127 // if the promise have been resolved while we was adding the handler to the queue | 
| 128 // we can't guarantee that someone is still processing it | |
| 129 // therefore we need to fetch a handler from the queue and execute it | |
| 130 // note that fetched handler may be not the one that we have added | |
| 131 // even we can fetch no handlers at all :) | |
| 242 | 132 SignalHandler(handler, m_state); | 
| 144 | 133 } | 
| 242 | 134 | 
| 135 } | |
| 136 | |
| 137 bool TryDequeueHandler(out THandler handler) { | |
| 138 handler = Interlocked.Exchange(ref m_handler, null); | |
| 139 if (handler != null) | |
| 140 return true; | |
| 141 return m_extraHandlers != null && m_extraHandlers.TryDequeue(out handler); | |
| 144 | 142 } | 
| 143 | |
| 144 #endregion | |
| 145 | |
| 146 #region IPromise implementation | |
| 147 | |
| 242 | 148 public bool IsFulfilled { | 
| 144 | 149 get { | 
| 242 | 150 return m_state > TRANSITIONAL_STATE; | 
| 144 | 151 } | 
| 152 } | |
| 153 | |
| 242 | 154 public bool IsRejected { | 
| 144 | 155 get { | 
| 242 | 156 return m_state == REJECTED_STATE; | 
| 144 | 157 } | 
| 158 } | |
| 159 | |
| 160 #endregion | |
| 161 | |
| 242 | 162 public Exception RejectReason { | 
| 144 | 163 get { | 
| 164 return m_error; | |
| 165 } | |
| 166 } | |
| 167 | |
| 168 } | |
| 169 } | |
| 170 | 
