File: Internal\Http3\Http3FrameWriter.cs
Web Access
Project: src\src\Servers\Kestrel\Core\src\Microsoft.AspNetCore.Server.Kestrel.Core.csproj (Microsoft.AspNetCore.Server.Kestrel.Core)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Http;
using System.Net.Http.QPack;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3;
 
internal sealed class Http3FrameWriter
{
    // These bytes represent a ":status: 100" continue response header frame encoded with
    // QPACK. To arrive at this, we first take the index in the QPACK static table for status
    // 100 (https://quicwg.org/base-drafts/draft-ietf-quic-qpack.html#appendix-A), which
    // is 63, and encode it to get ff 00 (see QPackEncoder.EncodeStaticIndexedHeaderField).
    // The two zero bytes are for the section prefix
    // (https://quicwg.org/base-drafts/draft-ietf-quic-qpack.html#header-prefix)
    private static ReadOnlySpan<byte> ContinueBytes => [0x00, 0x00, 0xff, 0x00];
 
    // Size based on HTTP/2 default frame size
    private const int MaxDataFrameSize = 16 * 1024;
    private const int HeaderBufferSize = 16 * 1024;
 
    private readonly Lock _writeLock = new();
 
    private readonly int _maxTotalHeaderSize;
    private readonly ConnectionContext _connectionContext;
    private readonly ITimeoutControl _timeoutControl;
    private readonly MinDataRate? _minResponseDataRate;
    private readonly MemoryPool<byte> _memoryPool;
    private readonly KestrelTrace _log;
    private readonly IStreamIdFeature _streamIdFeature;
    private readonly IHttp3Stream _http3Stream;
    private readonly Http3RawFrame _outgoingFrame;
    private readonly TimingPipeFlusher _flusher;
 
    private PipeWriter _outputWriter = default!;
    private string _connectionId = default!;
 
    // HTTP/3 doesn't have a max frame size (peer can optionally specify a size).
    // Write headers to a buffer that can grow. Possible performance improvement
    // by writing directly to output writer (difficult as frame length is prefixed).
    private readonly ArrayBufferWriter<byte> _headerEncodingBuffer;
    private readonly Http3HeadersEnumerator _headersEnumerator = new();
    private int _headersTotalSize;
 
    private long _unflushedBytes;
    private bool _completed;
    private bool _aborted;
 
    public Http3FrameWriter(ConnectionContext connectionContext, ITimeoutControl timeoutControl, MinDataRate? minResponseDataRate, MemoryPool<byte> memoryPool, KestrelTrace log, IStreamIdFeature streamIdFeature, Http3PeerSettings clientPeerSettings, IHttp3Stream http3Stream)
    {
        _connectionContext = connectionContext;
        _timeoutControl = timeoutControl;
        _minResponseDataRate = minResponseDataRate;
        _memoryPool = memoryPool;
        _log = log;
        _streamIdFeature = streamIdFeature;
        _http3Stream = http3Stream;
        _outgoingFrame = new Http3RawFrame();
        _flusher = new TimingPipeFlusher(timeoutControl, log);
        _headerEncodingBuffer = new ArrayBufferWriter<byte>(HeaderBufferSize);
 
        // Note that max total header size value doesn't react to settings change during a stream.
        // Unlikely to be a problem in practice:
        // - Settings rarely change after the start of a connection.
        // - Response header size limits are a best-effort requirement in the spec.
        _maxTotalHeaderSize = clientPeerSettings.MaxRequestHeaderFieldSectionSize > int.MaxValue
            ? int.MaxValue
            : (int)clientPeerSettings.MaxRequestHeaderFieldSectionSize;
    }
 
    public void Reset(PipeWriter output, string connectionId)
    {
        _outputWriter = output;
        _flusher.Initialize(output);
        _connectionId = connectionId;
 
        _headersTotalSize = 0;
        _headerEncodingBuffer.Clear();
        _unflushedBytes = 0;
        _completed = false;
        _aborted = false;
    }
 
    internal Task WriteSettingsAsync(List<Http3PeerSetting> settings)
    {
        _outgoingFrame.PrepareSettings();
 
        // Calculate how long settings are before allocating.
 
        var settingsLength = CalculateSettingsSize(settings);
 
        // Call GetSpan with enough room for
        // - One encoded length int for setting size
        // - 1 byte for setting type
        // - settings length
        var buffer = _outputWriter.GetSpan(settingsLength + VariableLengthIntegerHelper.MaximumEncodedLength + 1);
 
        // Length start at 1 for type
        var totalLength = 1;
 
        // Write setting type
        buffer[0] = (byte)_outgoingFrame.Type;
        buffer = buffer[1..];
 
        // Write settings length
        var settingsBytesWritten = VariableLengthIntegerHelper.WriteInteger(buffer, settingsLength);
        buffer = buffer.Slice(settingsBytesWritten);
 
        totalLength += settingsBytesWritten + settingsLength;
 
        WriteSettings(settings, buffer);
 
        // Advance pipe writer and flush
        _outgoingFrame.Length = totalLength;
        _outputWriter.Advance(totalLength);
 
        return _outputWriter.FlushAsync().GetAsTask();
    }
 
    internal static int CalculateSettingsSize(List<Http3PeerSetting> settings)
    {
        var length = 0;
        foreach (var setting in settings)
        {
            length += VariableLengthIntegerHelper.GetByteCount((long)setting.Parameter);
            length += VariableLengthIntegerHelper.GetByteCount(setting.Value);
        }
        return length;
    }
 
    internal static void WriteSettings(List<Http3PeerSetting> settings, Span<byte> destination)
    {
        foreach (var setting in settings)
        {
            var parameterLength = VariableLengthIntegerHelper.WriteInteger(destination, (long)setting.Parameter);
            destination = destination.Slice(parameterLength);
 
            var valueLength = VariableLengthIntegerHelper.WriteInteger(destination, (long)setting.Value);
            destination = destination.Slice(valueLength);
        }
    }
 
    internal Task WriteStreamIdAsync(long id)
    {
        var buffer = _outputWriter.GetSpan(8);
        _outputWriter.Advance(VariableLengthIntegerHelper.WriteInteger(buffer, id));
        return _outputWriter.FlushAsync().GetAsTask();
    }
 
    public ValueTask<FlushResult> WriteDataAsync(in ReadOnlySequence<byte> data)
    {
        // The Length property of a ReadOnlySequence can be expensive, so we cache the value.
        var dataLength = data.Length;
 
        lock (_writeLock)
        {
            if (_completed)
            {
                return default;
            }
 
            WriteDataUnsynchronized(data, dataLength);
            return TimeFlushUnsynchronizedAsync();
        }
    }
 
    private void WriteDataUnsynchronized(in ReadOnlySequence<byte> data, long dataLength)
    {
        Debug.Assert(dataLength == data.Length);
 
        _outgoingFrame.PrepareData();
 
        if (dataLength > MaxDataFrameSize)
        {
            SplitAndWriteDataUnsynchronized(in data, dataLength);
            return;
        }
 
        _outgoingFrame.Length = (int)dataLength;
 
        WriteHeaderUnsynchronized();
 
        foreach (var buffer in data)
        {
            _outputWriter.Write(buffer.Span);
        }
 
        return;
 
        void SplitAndWriteDataUnsynchronized(in ReadOnlySequence<byte> data, long dataLength)
        {
            Debug.Assert(dataLength == data.Length);
 
            var dataPayloadLength = (int)MaxDataFrameSize;
 
            Debug.Assert(dataLength > dataPayloadLength);
 
            var remainingData = data;
            do
            {
                var currentData = remainingData.Slice(0, dataPayloadLength);
                _outgoingFrame.Length = dataPayloadLength;
 
                WriteHeaderUnsynchronized();
 
                foreach (var buffer in currentData)
                {
                    _outputWriter.Write(buffer.Span);
                }
 
                dataLength -= dataPayloadLength;
                remainingData = remainingData.Slice(dataPayloadLength);
 
            } while (dataLength > dataPayloadLength);
 
            _outgoingFrame.Length = (int)dataLength;
 
            WriteHeaderUnsynchronized();
 
            foreach (var buffer in remainingData)
            {
                _outputWriter.Write(buffer.Span);
            }
        }
    }
 
    internal ValueTask<FlushResult> WriteGoAway(long id)
    {
        _outgoingFrame.PrepareGoAway();
 
        var length = VariableLengthIntegerHelper.GetByteCount(id);
 
        _outgoingFrame.Length = length;
 
        WriteHeaderUnsynchronized();
 
        var buffer = _outputWriter.GetSpan(8);
        VariableLengthIntegerHelper.WriteInteger(buffer, id);
        _outputWriter.Advance(length);
        return _outputWriter.FlushAsync();
    }
 
    private void WriteHeaderUnsynchronized()
    {
        _log.Http3FrameSending(_connectionId, _streamIdFeature.StreamId, _outgoingFrame);
        var headerLength = WriteHeader(_outgoingFrame.Type, _outgoingFrame.Length, _outputWriter);
 
        // We assume the payload will be written prior to the next flush.
        _unflushedBytes += headerLength + _outgoingFrame.Length;
    }
 
    public ValueTask<FlushResult> Write100ContinueAsync()
    {
        lock (_writeLock)
        {
            if (_completed)
            {
                return default;
            }
 
            _outgoingFrame.PrepareHeaders();
            _outgoingFrame.Length = ContinueBytes.Length;
            WriteHeaderUnsynchronized();
            _outputWriter.Write(ContinueBytes);
            return TimeFlushUnsynchronizedAsync();
        }
    }
 
    internal static int WriteHeader(Http3FrameType frameType, long frameLength, PipeWriter output)
    {
        // max size of the header is 16, most likely it will be smaller.
        var buffer = output.GetSpan(16);
 
        var typeLength = VariableLengthIntegerHelper.WriteInteger(buffer, (int)frameType);
 
        buffer = buffer.Slice(typeLength);
 
        var lengthLength = VariableLengthIntegerHelper.WriteInteger(buffer, (int)frameLength);
 
        var totalLength = typeLength + lengthLength;
        output.Advance(typeLength + lengthLength);
 
        return totalLength;
    }
 
    public ValueTask<FlushResult> WriteResponseTrailersAsync(long streamId, HttpResponseTrailers headers)
    {
        lock (_writeLock)
        {
            if (_completed)
            {
                return default;
            }
 
            try
            {
                _headersEnumerator.Initialize(headers);
                _headersTotalSize = 0;
                _headerEncodingBuffer.Clear();
 
                _outgoingFrame.PrepareHeaders();
                var buffer = _headerEncodingBuffer.GetSpan(HeaderBufferSize);
                var done = QPackHeaderWriter.BeginEncodeHeaders(_headersEnumerator, buffer, ref _headersTotalSize, out var payloadLength);
                FinishWritingHeaders(payloadLength, done);
            }
            // Any exception from the QPack encoder can leave the dynamic table in a corrupt state.
            // Since we allow custom header encoders we don't know what type of exceptions to expect.
            catch (Exception ex)
            {
                _log.QPackEncodingError(_connectionId, streamId, ex);
                _connectionContext.Abort(new ConnectionAbortedException(ex.Message, ex));
                _http3Stream.Abort(new ConnectionAbortedException(ex.Message, ex), Http3ErrorCode.InternalError);
            }
 
            return TimeFlushUnsynchronizedAsync();
        }
    }
 
    private ValueTask<FlushResult> TimeFlushUnsynchronizedAsync()
    {
        var bytesWritten = _unflushedBytes;
        _unflushedBytes = 0;
 
        return _flusher.FlushAsync(_minResponseDataRate, bytesWritten);
    }
 
    public ValueTask<FlushResult> FlushAsync(IHttpOutputAborter? outputAborter, CancellationToken cancellationToken)
    {
        lock (_writeLock)
        {
            if (_completed)
            {
                return default;
            }
 
            var bytesWritten = _unflushedBytes;
            _unflushedBytes = 0;
 
            return _flusher.FlushAsync(_minResponseDataRate, bytesWritten, outputAborter, cancellationToken);
        }
    }
 
    internal void WriteResponseHeaders(int statusCode, HttpResponseHeaders headers)
    {
        lock (_writeLock)
        {
            if (_completed)
            {
                return;
            }
 
            try
            {
                _headersEnumerator.Initialize(headers);
 
                _outgoingFrame.PrepareHeaders();
                var buffer = _headerEncodingBuffer.GetSpan(HeaderBufferSize);
                var done = QPackHeaderWriter.BeginEncodeHeaders(statusCode, _headersEnumerator, buffer, ref _headersTotalSize, out var payloadLength);
                FinishWritingHeaders(payloadLength, done);
            }
            // Any exception from the QPack encoder can leave the dynamic table in a corrupt state.
            // Since we allow custom header encoders we don't know what type of exceptions to expect.
            catch (Exception ex)
            {
                _log.QPackEncodingError(_connectionId, _http3Stream.StreamId, ex);
                _connectionContext.Abort(new ConnectionAbortedException(ex.Message, ex));
                _http3Stream.Abort(new ConnectionAbortedException(ex.Message, ex), Http3ErrorCode.InternalError);
                throw new InvalidOperationException(ex.Message, ex); // Report the error to the user if this was the first write.
            }
        }
    }
 
    private void FinishWritingHeaders(int payloadLength, bool done)
    {
        _headerEncodingBuffer.Advance(payloadLength);
 
        while (!done)
        {
            ValidateHeadersTotalSize();
            var buffer = _headerEncodingBuffer.GetSpan(HeaderBufferSize);
            done = QPackHeaderWriter.Encode(_headersEnumerator!, buffer, ref _headersTotalSize, out payloadLength);
            _headerEncodingBuffer.Advance(payloadLength);
        }
 
        ValidateHeadersTotalSize();
 
        _outgoingFrame.Length = _headerEncodingBuffer.WrittenCount;
        WriteHeaderUnsynchronized();
        _outputWriter.Write(_headerEncodingBuffer.WrittenSpan);
 
        void ValidateHeadersTotalSize()
        {
            // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-4.1.1.3
            if (_headersTotalSize > _maxTotalHeaderSize)
            {
                throw new QPackEncodingException($"The encoded HTTP headers length exceeds the limit specified by the peer of {_maxTotalHeaderSize} bytes.");
            }
        }
    }
 
    public ValueTask CompleteAsync()
    {
        lock (_writeLock)
        {
            if (_completed)
            {
                return default;
            }
 
            _completed = true;
            return _outputWriter.CompleteAsync();
        }
    }
 
    public void Abort(ConnectionAbortedException error)
    {
        lock (_writeLock)
        {
            if (_aborted)
            {
                return;
            }
 
            _aborted = true;
            _connectionContext.Abort(error);
 
            if (_completed)
            {
                return;
            }
 
            _completed = true;
            _outputWriter.Complete();
        }
    }
}