File: ServerSentEventsMessageFormatter.cs
Web Access
Project: src\aspnetcore\src\SignalR\common\Http.Connections\src\Microsoft.AspNetCore.Http.Connections.csproj (Microsoft.AspNetCore.Http.Connections)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;

namespace Microsoft.AspNetCore.Http.Connections;

internal static class ServerSentEventsMessageFormatter
{
    private static readonly ReadOnlyMemory<byte> DataPrefix = new[] { (byte)'d', (byte)'a', (byte)'t', (byte)'a', (byte)':', (byte)' ' };
    private static readonly ReadOnlyMemory<byte> Newline = new[] { (byte)'\r', (byte)'\n' };

    private const byte LineFeed = (byte)'\n';

    public static async Task WriteMessageAsync(ReadOnlySequence<byte> payload, Stream output, CancellationToken token)
    {
        // Payload does not contain a line feed so write it directly to output
        if (payload.PositionOf(LineFeed) == null)
        {
            if (payload.Length > 0)
            {
                await output.WriteAsync(DataPrefix, token);
                await output.WriteAsync(payload, token);
                await output.WriteAsync(Newline, token);
            }

            await output.WriteAsync(Newline, token);
            return;
        }

        var ms = new MemoryStream();

        // Parse payload and write formatted output to memory
        await WriteMessageToMemory(ms, payload);
        ms.Position = 0;

        await ms.CopyToAsync(output, token);
    }

    /// <summary>
    /// Gets the last memory segment in a sequence.
    /// </summary>
    /// <param name="source">Source sequence.</param>
    /// <param name="offset">The offset the segment starts at.</param>
    /// <returns>The last memory segment in a sequence.</returns>
    private static ReadOnlyMemory<byte> GetLastSegment(in ReadOnlySequence<byte> source, out long offset)
    {
        offset = 0;

        var totalLength = source.Length;
        var position = source.Start;
        while (source.TryGet(ref position, out ReadOnlyMemory<byte> memory))
        {
            // Last segment
            if (offset + memory.Length >= totalLength)
            {
                return memory;
            }

            offset += memory.Length;
        }

        throw new InvalidOperationException("Could not get last segment from sequence.");
    }

    private static async Task WriteMessageToMemory(Stream output, ReadOnlySequence<byte> payload)
    {
        var keepWriting = true;
        while (keepWriting)
        {
            var sliceEnd = payload.PositionOf(LineFeed);

            ReadOnlySequence<byte> lineSegment;
            if (sliceEnd == null)
            {
                lineSegment = payload;
                payload = ReadOnlySequence<byte>.Empty;
                keepWriting = false;
            }
            else
            {
                lineSegment = payload.Slice(payload.Start, sliceEnd.Value);

                if (lineSegment.Length > 1)
                {
                    // Check if the line ended in \r\n. If it did then trim the \r
                    var memory = GetLastSegment(lineSegment, out var offset);
                    if (memory.Span[memory.Length - 1] == '\r')
                    {
                        lineSegment = lineSegment.Slice(lineSegment.Start, offset + memory.Length - 1);
                    }
                }

                // Update payload to remove \n
                payload = payload.Slice(payload.GetPosition(1, sliceEnd.Value));
            }

            // Write line
            await output.WriteAsync(DataPrefix);
            await output.WriteAsync(lineSegment);
            await output.WriteAsync(Newline);
        }

        await output.WriteAsync(Newline);
    }
}