Synchronization
Provides basic synchronization and scheduling services for observable sequences.
using System.ComponentModel;
using System.Reactive.Disposables;
using System.Threading;
namespace System.Reactive.Concurrency
{
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static class Synchronization
{
private sealed class SubscribeOnObservable<TSource> : ObservableBase<TSource>
{
private sealed class Subscription : IDisposable
{
private IDisposable _cancel;
public Subscription(IObservable<TSource> source, IScheduler scheduler, IObserver<TSource> observer)
{
Disposable.TrySetSingle(ref _cancel, scheduler.Schedule<(Subscription, IObservable<TSource>, IObserver<TSource>)>((this, source, observer), (Func<IScheduler, (Subscription, IObservable<TSource>, IObserver<TSource>), IDisposable>)delegate(IScheduler closureScheduler, (Subscription this, IObservable<TSource> source, IObserver<TSource> observer) state) {
Disposable.TrySetSerial(ref state.this._cancel, new ScheduledDisposable(closureScheduler, ObservableExtensions.SubscribeSafe<TSource>(state.source, state.observer)));
return Disposable.Empty;
}));
}
public void Dispose()
{
Disposable.TryDispose(ref _cancel);
}
}
private readonly IObservable<TSource> _source;
private readonly IScheduler _scheduler;
public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler)
{
_source = source;
_scheduler = scheduler;
}
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
return new Subscription(_source, _scheduler, observer);
}
}
private sealed class SubscribeOnCtxObservable<TSource> : ObservableBase<TSource>
{
private sealed class Subscription : IDisposable
{
private readonly IObservable<TSource> _source;
private readonly IObserver<TSource> _observer;
private readonly SynchronizationContext _context;
private IDisposable _cancel;
public Subscription(IObservable<TSource> source, SynchronizationContext context, IObserver<TSource> observer)
{
_source = source;
_context = context;
_observer = observer;
SynchronizationContextExtensions.PostWithStartComplete<Subscription>(context, (Action<Subscription>)delegate(Subscription this) {
if (!Disposable.GetIsDisposed(ref this._cancel))
Disposable.SetSingle(ref this._cancel, new ContextDisposable(this._context, ObservableExtensions.SubscribeSafe<TSource>(this._source, this._observer)));
}, this);
}
public void Dispose()
{
Disposable.TryDispose(ref _cancel);
}
}
private readonly IObservable<TSource> _source;
private readonly SynchronizationContext _context;
public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context)
{
_source = source;
_context = context;
}
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
{
return new Subscription(_source, _context, observer);
}
}
public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return new SubscribeOnObservable<TSource>(source, scheduler);
}
public static IObservable<TSource> SubscribeOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return new SubscribeOnCtxObservable<TSource>(source, context);
}
public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
ISchedulerLongRunning schedulerLongRunning = scheduler.AsLongRunning();
if (schedulerLongRunning != null)
return new ObserveOn<TSource>.SchedulerLongRunning(source, schedulerLongRunning);
return new ObserveOn<TSource>.Scheduler(source, scheduler);
}
public static IObservable<TSource> ObserveOn<TSource>(IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return new ObserveOn<TSource>.Context(source, context);
}
public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return new Synchronize<TSource>(source);
}
public static IObservable<TSource> Synchronize<TSource>(IObservable<TSource> source, object gate)
{
if (source == null)
throw new ArgumentNullException("source");
if (gate == null)
throw new ArgumentNullException("gate");
return new Synchronize<TSource>(source, gate);
}
}
}