Synchronization
Provides basic synchronization and scheduling services for observable sequences.
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Runtime.CompilerServices;
using System.Threading;
namespace System.Reactive.Concurrency
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static class Synchronization
{
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
private sealed class SubscribeOnObservable<[System.Runtime.CompilerServices.Nullable(2)] TSource> : ObservableBase<TSource>
{
[System.Runtime.CompilerServices.NullableContext(0)]
private sealed class Subscription : IDisposable
{
private SerialDisposableValue _cancel;
[System.Runtime.CompilerServices.NullableContext(1)]
public Subscription(IObservable<TSource> source, IScheduler scheduler, IObserver<TSource> observer)
{
_cancel.TrySetFirst(scheduler.Schedule<(Subscription, IObservable<TSource>, IObserver<TSource>)>((this, source, observer), (Func<IScheduler, (Subscription, IObservable<TSource>, IObserver<TSource>), IDisposable>)delegate(IScheduler closureScheduler, (Subscription this, IObservable<TSource> source, IObserver<TSource> observer) state) {
state.this._cancel.Disposable = new ScheduledDisposable(closureScheduler, ObservableExtensions.SubscribeSafe<TSource>(state.source, state.observer));
return Disposable.Empty;
}));
}
public void Dispose()
{
_cancel.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly IScheduler _scheduler;
public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
return new Subscription(_source, _scheduler, observer);
}
}
[System.Runtime.CompilerServices.Nullable(new byte[] {
0,
1
})]
private sealed class SubscribeOnCtxObservable<[System.Runtime.CompilerServices.Nullable(2)] TSource> : ObservableBase<TSource>
{
[System.Runtime.CompilerServices.Nullable(0)]
private sealed class Subscription : IDisposable
{
private readonly IObservable<TSource> _source;
private readonly IObserver<TSource> _observer;
private readonly SynchronizationContext _context;
private SingleAssignmentDisposableValue _cancel;
public Subscription(IObservable<TSource> source, SynchronizationContext context, IObserver<TSource> observer)
{
_source = source;
_context = context;
_observer = observer;
SynchronizationContextExtensions.PostWithStartComplete<Subscription>(context, (Action<Subscription>)delegate(Subscription this) {
if (!this._cancel.IsDisposed)
this._cancel.Disposable = new ContextDisposable(this._context, ObservableExtensions.SubscribeSafe<TSource>(this._source, this._observer));
}, this);
}
public void Dispose()
{
_cancel.Dispose();
}
}
private readonly IObservable<TSource> _source;
private readonly SynchronizationContext _context;
public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context)
{
_source = source;
_context = context;
}
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
return new Subscription(_source, _context, observer);
}
}
public static IObservable<TSource> SubscribeOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SubscribeOnObservable<TSource>(source, scheduler);
}
public static IObservable<TSource> SubscribeOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return new SubscribeOnCtxObservable<TSource>(source, context);
}
public static IObservable<TSource> ObserveOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
ISchedulerLongRunning schedulerLongRunning = scheduler.AsLongRunning();
if (schedulerLongRunning != null)
return new ObserveOn<TSource>.SchedulerLongRunning(source, schedulerLongRunning);
return new ObserveOn<TSource>.Scheduler(source, scheduler);
}
public static IObservable<TSource> ObserveOn<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return new ObserveOn<TSource>.Context(source, context);
}
public static IObservable<TSource> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return new Synchronize<TSource>(source);
}
public static IObservable<TSource> Synchronize<[System.Runtime.CompilerServices.Nullable(2)] TSource>(IObservable<TSource> source, object gate)
{
if (source == null)
throw new ArgumentNullException("source");
if (gate == null)
throw new ArgumentNullException("gate");
return new Synchronize<TSource>(source, gate);
}
}
}