<PackageReference Include="System.ClientModel" Version="1.1.0-beta.7" />

ServerSentEventReader

sealed class ServerSentEventReader
An SSE event reader that reads lines from an SSE stream and composes them into SSE events. See SSE specification: https://html.spec.whatwg.org/multipage/server-sent-events.html
using System.Collections.Generic; using System.IO; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; namespace System.ClientModel.Internal { [System.Runtime.CompilerServices.NullableContext(1)] [System.Runtime.CompilerServices.Nullable(0)] internal sealed class ServerSentEventReader { [System.Runtime.CompilerServices.Nullable(0)] private struct PendingEvent { private const char LF = '\n'; [System.Runtime.CompilerServices.Nullable(2)] private List<ServerSentEventField> _dataFields; public int DataLength { [System.Runtime.CompilerServices.IsReadOnly] get; set; } public List<ServerSentEventField> DataFields => _dataFields ?? (_dataFields = new List<ServerSentEventField>()); public ServerSentEventField? EventTypeField { [System.Runtime.CompilerServices.IsReadOnly] get; set; } public ServerSentEvent ToEvent() { object obj; ReadOnlyMemory<char> value; if (!EventTypeField.HasValue) obj = "message"; else { value = EventTypeField.Value.Value; obj = value.ToString(); } string type = (string)obj; Memory<char> memory = new Memory<char>(new char[DataLength]); int num = 0; foreach (ServerSentEventField dataField in DataFields) { value = dataField.Value; ReadOnlySpan<char> span = value.Span; Span<char> span2 = memory.Span; span.CopyTo(span2.Slice(num)); span2 = memory.Span; int num2 = num; value = dataField.Value; span2[num2 + value.Length] = '\n'; int num3 = num; value = dataField.Value; num = num3 + (value.Length + 1); } string data = memory.Slice(0, memory.Length - 1).ToString(); return new ServerSentEvent(type, data); } } private readonly StreamReader _reader; public string LastEventId { get; set; } public TimeSpan ReconnectionInterval { get; set; } public ServerSentEventReader(Stream stream) { Argument.AssertNotNull(stream, "stream"); _reader = new StreamReader(stream); LastEventId = string.Empty; ReconnectionInterval = Timeout.InfiniteTimeSpan; } public ServerSentEvent? TryGetNextEvent(CancellationToken cancellationToken = default(CancellationToken)) { PendingEvent pending = default(PendingEvent); bool dispatch; do { cancellationToken.ThrowIfCancellationRequested(); string text = _reader.ReadLine(); if (text == null) return null; ProcessLine(text, ref pending, out dispatch); } while (!dispatch); return pending.ToEvent(); } [AsyncStateMachine(typeof(<TryGetNextEventAsync>d__11))] public Task<ServerSentEvent?> TryGetNextEventAsync(CancellationToken cancellationToken = default(CancellationToken)) { <TryGetNextEventAsync>d__11 stateMachine = default(<TryGetNextEventAsync>d__11); stateMachine.<>t__builder = AsyncTaskMethodBuilder<ServerSentEvent?>.Create(); stateMachine.<>4__this = this; stateMachine.cancellationToken = cancellationToken; stateMachine.<>1__state = -1; stateMachine.<>t__builder.Start(ref stateMachine); return stateMachine.<>t__builder.Task; } private void ProcessLine(string line, ref PendingEvent pending, out bool dispatch) { dispatch = false; if (line.Length == 0) { if (pending.DataLength == 0) pending = default(PendingEvent); else dispatch = true; } else if (line[0] != ':') { ServerSentEventField serverSentEventField = new ServerSentEventField(line); ReadOnlyMemory<char> value; switch (serverSentEventField.FieldType) { case ServerSentEventFieldKind.Event: pending.EventTypeField = serverSentEventField; break; case ServerSentEventFieldKind.Data: { int dataLength = pending.DataLength; value = serverSentEventField.Value; pending.DataLength = dataLength + (value.Length + 1); pending.DataFields.Add(serverSentEventField); break; } case ServerSentEventFieldKind.Id: value = serverSentEventField.Value; LastEventId = value.ToString(); break; case ServerSentEventFieldKind.Retry: value = serverSentEventField.Value; if (value.Length > 0) { value = serverSentEventField.Value; if (int.TryParse(value.ToString(), out int result)) ReconnectionInterval = TimeSpan.FromMilliseconds((double)result); } break; } } } } }