<PackageReference Include="System.Reactive" Version="4.1.5" />

TaskObservableMethodBuilder<T>

public struct TaskObservableMethodBuilder<T>
Represents a builder for asynchronous methods that return a task-like ITaskObservable<T>.
using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; using System.Security; namespace System.Runtime.CompilerServices { public struct TaskObservableMethodBuilder<T> { internal sealed class TaskObservable : ITaskObservable<T>, IObservable<T>, ITaskObservableAwaiter<T>, INotifyCompletion { private readonly AsyncSubject<T> _subject; private readonly T _result; private readonly Exception _exception; public bool IsCompleted { get { AsyncSubject<T> subject = _subject; if (subject == null) return true; return subject.IsCompleted; } } public TaskObservable() { _subject = new AsyncSubject<T>(); } public TaskObservable(T result) { _result = result; } public TaskObservable(Exception exception) { _exception = exception; } public void SetResult(T result) { if (IsCompleted) throw new InvalidOperationException(); _subject.OnNext(result); _subject.OnCompleted(); } public void SetException(Exception exception) { if (IsCompleted) throw new InvalidOperationException(); _subject.OnError(exception); } public IDisposable Subscribe(IObserver<T> observer) { if (_subject != null) return _subject.Subscribe(observer); if (_exception != null) { observer.OnError(_exception); return Disposable.Empty; } observer.OnNext(_result); return Disposable.Empty; } public ITaskObservableAwaiter<T> GetAwaiter() { return this; } public T GetResult() { if (_subject != null) return _subject.GetResult(); _exception.ThrowIfNotNull(); return _result; } public void OnCompleted(Action continuation) { if (_subject != null) _subject.OnCompleted(continuation); else continuation(); } } private IAsyncStateMachine _stateMachine; private TaskObservable _inner; public ITaskObservable<T> Task => _inner ?? (_inner = new TaskObservable()); public static TaskObservableMethodBuilder<T> Create() { return default(TaskObservableMethodBuilder<T>); } public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine : IAsyncStateMachine { if (stateMachine == null) throw new ArgumentNullException("stateMachine"); stateMachine.MoveNext(); } public void SetStateMachine(IAsyncStateMachine stateMachine) { if (_stateMachine != null) throw new InvalidOperationException(); if (stateMachine == null) throw new ArgumentNullException("stateMachine"); _stateMachine = stateMachine; } public void SetResult(T result) { if (_inner == null) _inner = new TaskObservable(result); else _inner.SetResult(result); } public void SetException(Exception exception) { if (exception == null) throw new ArgumentNullException("exception"); if (_inner == null) _inner = new TaskObservable(exception); else _inner.SetException(exception); } public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine { try { if (this._stateMachine == null) { ITaskObservable<T> task = this.Task; this._stateMachine = (IAsyncStateMachine)(object)stateMachine; this._stateMachine.SetStateMachine(this._stateMachine); } IAsyncStateMachine stateMachine2 = this._stateMachine; awaiter.OnCompleted(stateMachine2.MoveNext); } catch (Exception exception) { TaskObservableMethodBuilder<T>.Rethrow(exception); } } [SecuritySafeCritical] public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : ICriticalNotifyCompletion where TStateMachine : IAsyncStateMachine { try { if (this._stateMachine == null) { ITaskObservable<T> task = this.Task; this._stateMachine = (IAsyncStateMachine)(object)stateMachine; this._stateMachine.SetStateMachine(this._stateMachine); } IAsyncStateMachine stateMachine2 = this._stateMachine; awaiter.UnsafeOnCompleted(stateMachine2.MoveNext); } catch (Exception exception) { TaskObservableMethodBuilder<T>.Rethrow(exception); } } private static void Rethrow(Exception exception) { Scheduler.Schedule<Exception>((IScheduler)Scheduler.Default, exception, (Action<Exception, Action<Exception>>)delegate(Exception ex, Action<Exception> recurse) { ex.Throw(); }); } } }