ServerSentEventReader
An SSE event reader that reads lines from an SSE stream and composes them
into SSE events.
See SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html
using System.Collections.Generic;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace System.ClientModel.Internal
{
[System.Runtime.CompilerServices.NullableContext(1)]
[System.Runtime.CompilerServices.Nullable(0)]
internal sealed class ServerSentEventReader
{
[System.Runtime.CompilerServices.Nullable(0)]
private struct PendingEvent
{
private const char LF = '\n';
[System.Runtime.CompilerServices.Nullable(2)]
private List<ServerSentEventField> _dataFields;
public int DataLength {
[System.Runtime.CompilerServices.IsReadOnly]
get;
set;
}
public List<ServerSentEventField> DataFields => _dataFields ?? (_dataFields = new List<ServerSentEventField>());
public ServerSentEventField? EventTypeField {
[System.Runtime.CompilerServices.IsReadOnly]
get;
set;
}
public ServerSentEvent ToEvent()
{
object obj;
ReadOnlyMemory<char> value;
if (!EventTypeField.HasValue)
obj = "message";
else {
value = EventTypeField.Value.Value;
obj = value.ToString();
}
string type = (string)obj;
Memory<char> memory = new Memory<char>(new char[DataLength]);
int num = 0;
foreach (ServerSentEventField dataField in DataFields) {
value = dataField.Value;
ReadOnlySpan<char> span = value.Span;
Span<char> span2 = memory.Span;
span.CopyTo(span2.Slice(num));
span2 = memory.Span;
int num2 = num;
value = dataField.Value;
span2[num2 + value.Length] = '\n';
int num3 = num;
value = dataField.Value;
num = num3 + (value.Length + 1);
}
string data = memory.Slice(0, memory.Length - 1).ToString();
return new ServerSentEvent(type, data);
}
}
private readonly StreamReader _reader;
public string LastEventId { get; set; }
public TimeSpan ReconnectionInterval { get; set; }
public ServerSentEventReader(Stream stream)
{
Argument.AssertNotNull(stream, "stream");
_reader = new StreamReader(stream);
LastEventId = string.Empty;
ReconnectionInterval = Timeout.InfiniteTimeSpan;
}
public ServerSentEvent? TryGetNextEvent(CancellationToken cancellationToken = default(CancellationToken))
{
PendingEvent pending = default(PendingEvent);
bool dispatch;
do {
cancellationToken.ThrowIfCancellationRequested();
string text = _reader.ReadLine();
if (text == null)
return null;
ProcessLine(text, ref pending, out dispatch);
} while (!dispatch);
return pending.ToEvent();
}
[AsyncStateMachine(typeof(<TryGetNextEventAsync>d__11))]
public Task<ServerSentEvent?> TryGetNextEventAsync(CancellationToken cancellationToken = default(CancellationToken))
{
<TryGetNextEventAsync>d__11 stateMachine = default(<TryGetNextEventAsync>d__11);
stateMachine.<>t__builder = AsyncTaskMethodBuilder<ServerSentEvent?>.Create();
stateMachine.<>4__this = this;
stateMachine.cancellationToken = cancellationToken;
stateMachine.<>1__state = -1;
stateMachine.<>t__builder.Start(ref stateMachine);
return stateMachine.<>t__builder.Task;
}
private void ProcessLine(string line, ref PendingEvent pending, out bool dispatch)
{
dispatch = false;
if (line.Length == 0) {
if (pending.DataLength == 0)
pending = default(PendingEvent);
else
dispatch = true;
} else if (line[0] != ':') {
ServerSentEventField serverSentEventField = new ServerSentEventField(line);
ReadOnlyMemory<char> value;
switch (serverSentEventField.FieldType) {
case ServerSentEventFieldKind.Event:
pending.EventTypeField = serverSentEventField;
break;
case ServerSentEventFieldKind.Data: {
int dataLength = pending.DataLength;
value = serverSentEventField.Value;
pending.DataLength = dataLength + (value.Length + 1);
pending.DataFields.Add(serverSentEventField);
break;
}
case ServerSentEventFieldKind.Id:
value = serverSentEventField.Value;
LastEventId = value.ToString();
break;
case ServerSentEventFieldKind.Retry:
value = serverSentEventField.Value;
if (value.Length > 0) {
value = serverSentEventField.Value;
if (int.TryParse(value.ToString(), out int result))
ReconnectionInterval = TimeSpan.FromMilliseconds((double)result);
}
break;
}
}
}
}
}