File: Internal\Http3\Http3ControlStream.cs
Web Access
Project: src\src\Servers\Kestrel\Core\src\Microsoft.AspNetCore.Server.Kestrel.Core.csproj (Microsoft.AspNetCore.Server.Kestrel.Core)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Globalization;
using System.IO.Pipelines;
using System.Net.Http;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Connections.Features;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Http.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
 
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3;
 
internal abstract class Http3ControlStream : IHttp3Stream, IThreadPoolWorkItem
{
    private const int ControlStreamTypeId = 0;
    private const int EncoderStreamTypeId = 2;
    private const int DecoderStreamTypeId = 3;
 
    private readonly Http3FrameWriter _frameWriter;
    private readonly Http3StreamContext _context;
    private readonly Http3PeerSettings _serverPeerSettings;
    private readonly IStreamIdFeature _streamIdFeature;
    private readonly IStreamClosedFeature _streamClosedFeature;
    private readonly IProtocolErrorCodeFeature _errorCodeFeature;
    private readonly Http3RawFrame _incomingFrame = new Http3RawFrame();
    private volatile int _isClosed;
    private long _headerType;
    private readonly Lock _completionLock = new();
 
    private bool _haveReceivedSettingsFrame;
    private StreamCompletionFlags _completionState;
 
    public bool EndStreamReceived => (_completionState & StreamCompletionFlags.EndStreamReceived) == StreamCompletionFlags.EndStreamReceived;
    public bool IsAborted => (_completionState & StreamCompletionFlags.Aborted) == StreamCompletionFlags.Aborted;
    public bool IsCompleted => (_completionState & StreamCompletionFlags.Completed) == StreamCompletionFlags.Completed;
 
    public long StreamId => _streamIdFeature.StreamId;
 
    public Http3ControlStream(Http3StreamContext context, long? headerType)
    {
        var httpLimits = context.ServiceContext.ServerOptions.Limits;
        _context = context;
        _serverPeerSettings = context.ServerPeerSettings;
        _streamIdFeature = context.ConnectionFeatures.GetRequiredFeature<IStreamIdFeature>();
        _streamClosedFeature = context.ConnectionFeatures.GetRequiredFeature<IStreamClosedFeature>();
        _errorCodeFeature = context.ConnectionFeatures.GetRequiredFeature<IProtocolErrorCodeFeature>();
        _headerType = headerType ?? -1;
 
        _frameWriter = new Http3FrameWriter(
            context.StreamContext,
            context.TimeoutControl,
            httpLimits.MinResponseDataRate,
            context.MemoryPool,
            context.ServiceContext.Log,
            _streamIdFeature,
            context.ClientPeerSettings,
            this);
        _frameWriter.Reset(context.Transport.Output, context.ConnectionId);
    }
 
    private void OnStreamClosed()
    {
        ApplyCompletionFlag(StreamCompletionFlags.Completed);
    }
 
    public PipeReader Input => _context.Transport.Input;
    public KestrelTrace Log => _context.ServiceContext.Log;
 
    public long StreamTimeoutTimestamp { get; set; }
    public bool IsReceivingHeader => _headerType == -1;
    public bool IsDraining => false;
    public bool IsRequestStream => false;
    public string TraceIdentifier => _context.StreamContext.ConnectionId;
 
    public void Abort(ConnectionAbortedException abortReason, Http3ErrorCode errorCode)
    {
        lock (_completionLock)
        {
            if (IsCompleted || IsAborted)
            {
                return;
            }
 
            var (oldState, newState) = ApplyCompletionFlag(StreamCompletionFlags.Aborted);
 
            if (oldState == newState)
            {
                return;
            }
 
            Log.Http3StreamAbort(TraceIdentifier, errorCode, abortReason);
 
            _errorCodeFeature.Error = (long)errorCode;
            _frameWriter.Abort(abortReason);
 
            Input.Complete(abortReason);
        }
    }
 
    public void OnInputOrOutputCompleted()
    {
        TryClose();
    }
 
    private bool TryClose()
    {
        if (Interlocked.Exchange(ref _isClosed, 1) == 0)
        {
            Input.Complete();
            return true;
        }
 
        return false;
    }
 
    private (StreamCompletionFlags OldState, StreamCompletionFlags NewState) ApplyCompletionFlag(StreamCompletionFlags completionState)
    {
        lock (_completionLock)
        {
            var oldCompletionState = _completionState;
            _completionState |= completionState;
 
            return (oldCompletionState, _completionState);
        }
    }
 
    internal async ValueTask ProcessOutboundSendsAsync(long id)
    {
        _streamClosedFeature.OnClosed(static state =>
        {
            var stream = (Http3ControlStream)state!;
            stream.OnStreamClosed();
        }, this);
 
        await _frameWriter.WriteStreamIdAsync(id);
        await _frameWriter.WriteSettingsAsync(_serverPeerSettings.GetNonProtocolDefaults());
    }
 
    internal ValueTask<FlushResult> SendGoAway(long id)
    {
        Log.Http3GoAwayStreamId(_context.ConnectionId, id);
        return _frameWriter.WriteGoAway(id);
    }
 
    private async ValueTask<long> TryReadStreamHeaderAsync()
    {
        // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-6.2
        while (_isClosed == 0)
        {
            var result = await Input.ReadAsync();
            var readableBuffer = result.Buffer;
            var consumed = readableBuffer.Start;
            var examined = readableBuffer.End;
 
            try
            {
                if (!readableBuffer.IsEmpty)
                {
                    var id = VariableLengthIntegerHelper.GetInteger(readableBuffer, out consumed, out examined);
                    if (id != -1)
                    {
                        return id;
                    }
                }
 
                if (result.IsCompleted)
                {
                    return -1;
                }
            }
            finally
            {
                Input.AdvanceTo(consumed, examined);
            }
        }
 
        return -1;
    }
 
    public async Task ProcessRequestAsync<TContext>(IHttpApplication<TContext> application) where TContext : notnull
    {
        try
        {
            // todo: the _headerType should be read earlier
            // and by the Http3PendingStream. However, to
            // avoid perf issues with the current implementation
            // we can defer the reading until now
            // (https://github.com/dotnet/aspnetcore/issues/42789)
            if (_headerType == -1)
            {
                _headerType = await TryReadStreamHeaderAsync();
            }
 
            _context.StreamLifetimeHandler.OnStreamHeaderReceived(this);
 
            switch (_headerType)
            {
                case ControlStreamTypeId:
                    if (!_context.StreamLifetimeHandler.OnInboundControlStream(this))
                    {
                        // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-6.2.1
                        throw new Http3ConnectionErrorException(CoreStrings.FormatHttp3ControlStreamErrorMultipleInboundStreams("control"), Http3ErrorCode.StreamCreationError, ConnectionEndReason.StreamCreationError);
                    }
 
                    await HandleControlStream();
                    break;
                case EncoderStreamTypeId:
                    if (!_context.StreamLifetimeHandler.OnInboundEncoderStream(this))
                    {
                        // https://quicwg.org/base-drafts/draft-ietf-quic-qpack.html#section-4.2
                        throw new Http3ConnectionErrorException(CoreStrings.FormatHttp3ControlStreamErrorMultipleInboundStreams("encoder"), Http3ErrorCode.StreamCreationError, ConnectionEndReason.StreamCreationError);
                    }
 
                    await HandleEncodingDecodingTask();
                    break;
                case DecoderStreamTypeId:
                    if (!_context.StreamLifetimeHandler.OnInboundDecoderStream(this))
                    {
                        // https://quicwg.org/base-drafts/draft-ietf-quic-qpack.html#section-4.2
                        throw new Http3ConnectionErrorException(CoreStrings.FormatHttp3ControlStreamErrorMultipleInboundStreams("decoder"), Http3ErrorCode.StreamCreationError, ConnectionEndReason.StreamCreationError);
                    }
                    await HandleEncodingDecodingTask();
                    break;
                default:
                    // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-6.2-6
                    throw new Http3StreamErrorException(CoreStrings.FormatHttp3ControlStreamErrorUnsupportedType(_headerType), Http3ErrorCode.StreamCreationError);
            }
        }
        catch (Http3StreamErrorException ex)
        {
            Abort(new ConnectionAbortedException(ex.Message), ex.ErrorCode);
        }
        catch (Http3ConnectionErrorException ex)
        {
            _errorCodeFeature.Error = (long)ex.ErrorCode;
            _context.StreamLifetimeHandler.OnStreamConnectionError(ex);
        }
        finally
        {
            ApplyCompletionFlag(StreamCompletionFlags.Completed);
            _context.StreamLifetimeHandler.OnStreamCompleted(this);
        }
    }
 
    private async Task HandleControlStream()
    {
        while (_isClosed == 0)
        {
            var result = await Input.ReadAsync();
            var readableBuffer = result.Buffer;
            var consumed = readableBuffer.Start;
            var examined = readableBuffer.End;
 
            try
            {
                if (!readableBuffer.IsEmpty)
                {
                    // need to kick off httpprotocol process request async here.
                    while (Http3FrameReader.TryReadFrame(ref readableBuffer, _incomingFrame, out var framePayload))
                    {
                        Log.Http3FrameReceived(_context.ConnectionId, _streamIdFeature.StreamId, _incomingFrame);
 
                        consumed = examined = framePayload.End;
                        await ProcessHttp3ControlStream(framePayload);
                    }
                }
 
                if (result.IsCompleted)
                {
                    return;
                }
            }
            finally
            {
                Input.AdvanceTo(consumed, examined);
            }
        }
    }
 
    private async ValueTask HandleEncodingDecodingTask()
    {
        // Noop encoding and decoding task. Settings make it so we don't need to read content of encoder and decoder.
        // An endpoint MUST allow its peer to create an encoder stream and a
        // decoder stream even if the connection's settings prevent their use.
 
        while (_isClosed == 0)
        {
            var result = await Input.ReadAsync();
            var readableBuffer = result.Buffer;
            Input.AdvanceTo(readableBuffer.End);
        }
    }
 
    private ValueTask ProcessHttp3ControlStream(in ReadOnlySequence<byte> payload)
    {
        switch (_incomingFrame.Type)
        {
            case Http3FrameType.Data:
            case Http3FrameType.Headers:
            case Http3FrameType.PushPromise:
                // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-7.2
                throw new Http3ConnectionErrorException(CoreStrings.FormatHttp3ErrorUnsupportedFrameOnControlStream(_incomingFrame.FormattedType), Http3ErrorCode.UnexpectedFrame, ConnectionEndReason.UnexpectedFrame);
            case Http3FrameType.Settings:
                return ProcessSettingsFrameAsync(payload);
            case Http3FrameType.GoAway:
                return ProcessGoAwayFrameAsync();
            case Http3FrameType.CancelPush:
                return ProcessCancelPushFrameAsync();
            case Http3FrameType.MaxPushId:
                return ProcessMaxPushIdFrameAsync();
            default:
                return ProcessUnknownFrameAsync(_incomingFrame.Type);
        }
    }
 
    private ValueTask ProcessSettingsFrameAsync(ReadOnlySequence<byte> payload)
    {
        if (_haveReceivedSettingsFrame)
        {
            // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-settings
            throw new Http3ConnectionErrorException(CoreStrings.Http3ErrorControlStreamMultipleSettingsFrames, Http3ErrorCode.UnexpectedFrame, ConnectionEndReason.UnexpectedFrame);
        }
 
        _haveReceivedSettingsFrame = true;
        _streamClosedFeature.OnClosed(static state =>
        {
            var stream = (Http3ControlStream)state!;
            stream.OnStreamClosed();
        }, this);
 
        while (true)
        {
            var id = VariableLengthIntegerHelper.GetInteger(payload, out var consumed, out _);
            if (id == -1)
            {
                break;
            }
 
            payload = payload.Slice(consumed);
 
            var value = VariableLengthIntegerHelper.GetInteger(payload, out consumed, out _);
            if (value == -1)
            {
                break;
            }
 
            payload = payload.Slice(consumed);
            ProcessSetting(id, value);
        }
 
        return default;
    }
 
    private void ProcessSetting(long id, long value)
    {
        // These are client settings, for outbound traffic.
        switch (id)
        {
            case 0x0:
            case 0x2:
            case 0x3:
            case 0x4:
            case 0x5:
                // HTTP/2 settings are reserved.
                // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-7.2.4.1-5
                var message = CoreStrings.FormatHttp3ErrorControlStreamReservedSetting("0x" + id.ToString("X", CultureInfo.InvariantCulture));
                throw new Http3ConnectionErrorException(message, Http3ErrorCode.SettingsError, ConnectionEndReason.InvalidSettings);
            case (long)Http3SettingType.QPackMaxTableCapacity:
            case (long)Http3SettingType.MaxFieldSectionSize:
            case (long)Http3SettingType.QPackBlockedStreams:
            case (long)Http3SettingType.EnableWebTransport:
            case (long)Http3SettingType.H3Datagram:
                _context.StreamLifetimeHandler.OnInboundControlStreamSetting((Http3SettingType)id, value);
                break;
            default:
                // Ignore all unknown settings.
                // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-7.2.4
                break;
        }
    }
 
    private ValueTask ProcessGoAwayFrameAsync()
    {
        EnsureSettingsFrame(Http3FrameType.GoAway);
 
        // StopProcessingNextRequest must be called before RequestClose to ensure it's considered client initiated.
        _context.Connection.StopProcessingNextRequest(serverInitiated: false, ConnectionEndReason.ClientGoAway);
        _context.ConnectionContext.Features.Get<IConnectionLifetimeNotificationFeature>()?.RequestClose();
 
        // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-goaway
        // PUSH is not implemented so nothing to do.
 
        // TODO: Double check the connection remains open.
        return default;
    }
 
    private ValueTask ProcessCancelPushFrameAsync()
    {
        EnsureSettingsFrame(Http3FrameType.CancelPush);
 
        // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-cancel_push
        // PUSH is not implemented so nothing to do.
 
        return default;
    }
 
    private ValueTask ProcessMaxPushIdFrameAsync()
    {
        EnsureSettingsFrame(Http3FrameType.MaxPushId);
 
        // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#name-cancel_push
        // PUSH is not implemented so nothing to do.
 
        return default;
    }
 
    private ValueTask ProcessUnknownFrameAsync(Http3FrameType frameType)
    {
        EnsureSettingsFrame(frameType);
 
        // https://quicwg.org/base-drafts/draft-ietf-quic-http.html#section-9
        // Unknown frames must be explicitly ignored.
        return default;
    }
 
    private void EnsureSettingsFrame(Http3FrameType frameType)
    {
        if (!_haveReceivedSettingsFrame)
        {
            var message = CoreStrings.FormatHttp3ErrorControlStreamFrameReceivedBeforeSettings(Http3Formatting.ToFormattedType(frameType));
            throw new Http3ConnectionErrorException(message, Http3ErrorCode.MissingSettings, ConnectionEndReason.InvalidSettings);
        }
    }
 
    /// <summary>
    /// Used to kick off the request processing loop by derived classes.
    /// </summary>
    public abstract void Execute();
 
    private static class GracefulCloseInitiator
    {
        public const int None = 0;
        public const int Server = 1;
        public const int Client = 2;
    }
}