Latest<TSource>
using System.
Threading;
namespace System.
Reactive.
Linq.
ObservableImpl
{
internal sealed class Latest<
TSource> :
PushToPullAdapter<
TSource,
TSource>
{
private sealed class _ :
PushToPullSink<
TSource,
TSource>
{
private readonly object _gate;
private readonly SemaphoreSlim _semaphore;
private bool _notificationAvailable;
private NotificationKind _kind;
private TSource _value;
private Exception _error;
public _(
IDisposable subscription)
:
base(
subscription)
{
_gate =
new object();
_semaphore =
new SemaphoreSlim(
0,
1);
}
public override void OnNext(
TSource value)
{
bool flag =
false;
lock (
_gate) {
flag = !
_notificationAvailable;
_notificationAvailable =
true;
_kind =
NotificationKind.
OnNext;
_value =
value;
}
if (
flag)
_semaphore.
Release();
}
public override void OnError(
Exception error)
{
Dispose();
bool flag =
false;
lock (
_gate) {
flag = !
_notificationAvailable;
_notificationAvailable =
true;
_kind =
NotificationKind.
OnError;
_error =
error;
}
if (
flag)
_semaphore.
Release();
}
public override void OnCompleted()
{
Dispose();
bool flag =
false;
lock (
_gate) {
flag = !
_notificationAvailable;
_notificationAvailable =
true;
_kind =
NotificationKind.
OnCompleted;
}
if (
flag)
_semaphore.
Release();
}
public override bool TryMoveNext(
out TSource current)
{
NotificationKind notificationKind =
NotificationKind.
OnNext;
Exception exception =
null;
_semaphore.
Wait();
lock (
_gate) {
notificationKind =
_kind;
switch (
notificationKind) {
case NotificationKind.
OnNext: {
TSource value =
_value;
break;
}
case NotificationKind.
OnError:
exception =
_error;
break;
}
_notificationAvailable =
false;
}
switch (
notificationKind) {
case NotificationKind.
OnNext:
current =
_value;
return true;
case NotificationKind.
OnError:
exception.
Throw();
break;
}
current =
default(
TSource);
return false;
}
}
public Latest(
IObservable<
TSource>
source)
:
base(
source)
{
}
protected override PushToPullSink<
TSource,
TSource>
Run(
IDisposable subscription)
{
return new _(
subscription);
}
}
}