TaskObservableMethodBuilder<T>
Represents a builder for asynchronous methods that return a task-like ITaskObservable<T>.
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
using System.Security;
namespace System.Runtime.CompilerServices
{
public struct TaskObservableMethodBuilder<T>
{
internal sealed class TaskObservable : ITaskObservable<T>, IObservable<T>, ITaskObservableAwaiter<T>, INotifyCompletion
{
private readonly AsyncSubject<T> _subject;
private readonly T _result;
private readonly Exception _exception;
public bool IsCompleted {
get {
AsyncSubject<T> subject = _subject;
if (subject == null)
return true;
return subject.IsCompleted;
}
}
public TaskObservable()
{
_subject = new AsyncSubject<T>();
}
public TaskObservable(T result)
{
_result = result;
}
public TaskObservable(Exception exception)
{
_exception = exception;
}
public void SetResult(T result)
{
if (IsCompleted)
throw new InvalidOperationException();
_subject.OnNext(result);
_subject.OnCompleted();
}
public void SetException(Exception exception)
{
if (IsCompleted)
throw new InvalidOperationException();
_subject.OnError(exception);
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (_subject != null)
return _subject.Subscribe(observer);
if (_exception != null) {
observer.OnError(_exception);
return Disposable.Empty;
}
observer.OnNext(_result);
return Disposable.Empty;
}
public ITaskObservableAwaiter<T> GetAwaiter()
{
return this;
}
public T GetResult()
{
if (_subject != null)
return _subject.GetResult();
_exception.ThrowIfNotNull();
return _result;
}
public void OnCompleted(Action continuation)
{
if (_subject != null)
_subject.OnCompleted(continuation);
else
continuation();
}
}
private IAsyncStateMachine _stateMachine;
private TaskObservable _inner;
public ITaskObservable<T> Task => _inner ?? (_inner = new TaskObservable());
public static TaskObservableMethodBuilder<T> Create()
{
return default(TaskObservableMethodBuilder<T>);
}
public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine
{
if (stateMachine == null)
throw new ArgumentNullException("stateMachine");
stateMachine.MoveNext();
}
public void SetStateMachine(IAsyncStateMachine stateMachine)
{
if (_stateMachine != null)
throw new InvalidOperationException();
if (stateMachine == null)
throw new ArgumentNullException("stateMachine");
_stateMachine = stateMachine;
}
public void SetResult(T result)
{
if (_inner == null)
_inner = new TaskObservable(result);
else
_inner.SetResult(result);
}
public void SetException(Exception exception)
{
if (exception == null)
throw new ArgumentNullException("exception");
if (_inner == null)
_inner = new TaskObservable(exception);
else
_inner.SetException(exception);
}
public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine
{
try {
if (this._stateMachine == null) {
ITaskObservable<T> task = this.Task;
this._stateMachine = (IAsyncStateMachine)(object)stateMachine;
this._stateMachine.SetStateMachine(this._stateMachine);
}
IAsyncStateMachine stateMachine2 = this._stateMachine;
awaiter.OnCompleted(stateMachine2.MoveNext);
} catch (Exception exception) {
TaskObservableMethodBuilder<T>.Rethrow(exception);
}
}
[SecuritySafeCritical]
public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine
{
try {
if (this._stateMachine == null) {
ITaskObservable<T> task = this.Task;
this._stateMachine = (IAsyncStateMachine)(object)stateMachine;
this._stateMachine.SetStateMachine(this._stateMachine);
}
IAsyncStateMachine stateMachine2 = this._stateMachine;
awaiter.UnsafeOnCompleted(stateMachine2.MoveNext);
} catch (Exception exception) {
TaskObservableMethodBuilder<T>.Rethrow(exception);
}
}
private static void Rethrow(Exception exception)
{
Scheduler.Schedule<Exception>((IScheduler)Scheduler.Default, exception, (Action<Exception, Action<Exception>>)delegate(Exception ex, Action<Exception> recurse) {
ex.Throw();
});
}
}
}