// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using System.Net.Http;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
using Microsoft.Extensions.Logging;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http3;
internal sealed class Http3OutputProducer : IHttpOutputProducer, IHttpOutputAborter
private readonly Http3FrameWriter _frameWriter;
private readonly TimingPipeFlusher _flusher;
private readonly KestrelTrace _log;
private readonly MemoryPool<byte> _memoryPool;
private readonly Http3Stream _stream;
private readonly Pipe _pipe;
private readonly PipeWriter _pipeWriter;
private readonly PipeReader _pipeReader;
private readonly Lock _dataWriterLock = new();
private ValueTask<FlushResult> _dataWriteProcessingTask;
private bool _startedWritingDataFrames;
private bool _streamCompleted;
private bool _disposed;
private bool _suffixSent;
private IMemoryOwner<byte>? _fakeMemoryOwner;
private byte[]? _fakeMemory;
public Http3OutputProducer(
Http3FrameWriter frameWriter,
MemoryPool<byte> pool,
Http3Stream stream,
KestrelTrace log)
_frameWriter = frameWriter;
_memoryPool = pool;
_stream = stream;
_log = log;
_pipe = CreateDataPipe(pool);
_pipeWriter = _pipe.Writer;
_pipeReader = _pipe.Reader;
_flusher = new TimingPipeFlusher(timeoutControl: null, log);
_dataWriteProcessingTask = ProcessDataWrites().Preserve();
public void StreamReset()
// Data background task has finished.
_suffixSent = false;
_startedWritingDataFrames = false;
_streamCompleted = false;
_dataWriteProcessingTask = ProcessDataWrites().Preserve();
// Called once Application code has exited
// Or on Dispose which also would occur after Application code finished
public void Complete()
lock (_dataWriterLock)
if (_fakeMemoryOwner != null)
_fakeMemoryOwner = null;
if (_fakeMemory != null)
_fakeMemory = null;
public void Dispose()
lock (_dataWriterLock)
if (_disposed)
_disposed = true;
// In HTTP/1.x, this aborts the entire connection. For HTTP/3 we abort the stream.
void IHttpOutputAborter.Abort(ConnectionAbortedException abortReason, ConnectionEndReason reason)
_stream.Abort(abortReason, Http3ErrorCode.InternalError);
void IHttpOutputAborter.OnInputOrOutputCompleted()
_stream.Abort(new ConnectionAbortedException($"{nameof(Http3OutputProducer)}.{nameof(ProcessDataWrites)} has completed."), Http3ErrorCode.InternalError);
public void Advance(int bytes)
lock (_dataWriterLock)
if (_streamCompleted)
_startedWritingDataFrames = true;
public long UnflushedBytes => _pipeWriter.UnflushedBytes;
public void CancelPendingFlush()
lock (_dataWriterLock)
if (_streamCompleted)
public ValueTask<FlushResult> FirstWriteAsync(int statusCode, string? reasonPhrase, HttpResponseHeaders responseHeaders, ResponseBodyMode responseBodyMode, ReadOnlySpan<byte> data, CancellationToken cancellationToken)
lock (_dataWriterLock)
WriteResponseHeaders(statusCode, reasonPhrase, responseHeaders, responseBodyMode, appCompleted: false);
return WriteDataToPipeAsync(data, cancellationToken);
public ValueTask<FlushResult> FirstWriteChunkedAsync(int statusCode, string? reasonPhrase, HttpResponseHeaders responseHeaders, ResponseBodyMode responseBodyMode, ReadOnlySpan<byte> data, CancellationToken cancellationToken)
throw new NotImplementedException();
public ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
if (cancellationToken.IsCancellationRequested)
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
lock (_dataWriterLock)
if (_streamCompleted)
return new ValueTask<FlushResult>(new FlushResult(false, true));
if (_startedWritingDataFrames)
// If there's already been response data written to the stream, just wait for that. Any header
// should be in front of the data frames in the connection pipe. Trailers could change things.
return _flusher.FlushAsync(this, cancellationToken);
// Flushing the connection pipe ensures headers already in the pipe are flushed even if no data
// frames have been written.
return _frameWriter.FlushAsync(this, cancellationToken);
public Memory<byte> GetMemory(int sizeHint = 0)
lock (_dataWriterLock)
if (_streamCompleted)
return GetFakeMemory(sizeHint);
return _pipeWriter.GetMemory(sizeHint);
public Span<byte> GetSpan(int sizeHint = 0)
lock (_dataWriterLock)
if (_streamCompleted)
return GetFakeMemory(sizeHint).Span;
return _pipeWriter.GetSpan(sizeHint);
internal Memory<byte> GetFakeMemory(int minSize)
// Try to reuse _fakeMemoryOwner
if (_fakeMemoryOwner != null)
if (_fakeMemoryOwner.Memory.Length < minSize)
_fakeMemoryOwner = null;
return _fakeMemoryOwner.Memory;
// Try to reuse _fakeMemory
if (_fakeMemory != null)
if (_fakeMemory.Length < minSize)
_fakeMemory = null;
return _fakeMemory;
// Requesting a bigger buffer could throw.
if (minSize <= _memoryPool.MaxBufferSize)
// Use the specified pool as it fits.
_fakeMemoryOwner = _memoryPool.Rent(minSize);
return _fakeMemoryOwner.Memory;
// Use the array pool. Its MaxBufferSize is int.MaxValue.
return _fakeMemory = ArrayPool<byte>.Shared.Rent(minSize);
private void ThrowIfSuffixSent()
if (_suffixSent)
private static void ThrowSuffixSent()
throw new InvalidOperationException("Writing is not allowed after writer was completed.");
public void Reset()
public void Stop()
lock (_dataWriterLock)
if (_streamCompleted)
_streamCompleted = true;
// Application code could be using this PipeWriter, we cancel the next (or in progress) flush so they can observe this Stop
// Additionally, _streamCompleted will cause any future PipeWriter operations to noop
public ValueTask<FlushResult> Write100ContinueAsync()
lock (_dataWriterLock)
if (_streamCompleted)
return default;
return _frameWriter.Write100ContinueAsync();
public ValueTask<FlushResult> WriteChunkAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken)
throw new NotImplementedException();
public Task WriteDataAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken)
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
lock (_dataWriterLock)
// This length check is important because we don't want to set _startedWritingDataFrames unless a data
// frame will actually be written causing the headers to be flushed.
if (_streamCompleted || data.Length == 0)
return Task.CompletedTask;
_startedWritingDataFrames = true;
return _flusher.FlushAsync(this, cancellationToken).GetAsTask();
public ValueTask<FlushResult> WriteDataToPipeAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken)
if (cancellationToken.IsCancellationRequested)
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
lock (_dataWriterLock)
// This length check is important because we don't want to set _startedWritingDataFrames unless a data
// frame will actually be written causing the headers to be flushed.
if (_streamCompleted || data.Length == 0)
return new ValueTask<FlushResult>(new FlushResult(false, true));
_startedWritingDataFrames = true;
return _flusher.FlushAsync(this, cancellationToken);
public void WriteResponseHeaders(int statusCode, string? reasonPhrase, HttpResponseHeaders responseHeaders, ResponseBodyMode responseBodyMode, bool appCompleted)
// appCompleted flag is not used here. The write FIN is sent via the transport and not via the frame.
// Headers are written to buffer and flushed with a FIN when Http3FrameWriter.CompleteAsync is called
// in ProcessDataWrites.
lock (_dataWriterLock)
if (_streamCompleted)
_frameWriter.WriteResponseHeaders(statusCode, responseHeaders);
public ValueTask<FlushResult> WriteStreamSuffixAsync()
lock (_dataWriterLock)
if (_streamCompleted)
return _dataWriteProcessingTask;
_streamCompleted = true;
_suffixSent = true;
return _dataWriteProcessingTask;
private async ValueTask<FlushResult> ProcessDataWrites()
FlushResult flushResult = default;
ReadResult readResult;
readResult = await _pipeReader.ReadAsync();
if (readResult.IsCompleted && _stream.ResponseTrailers?.Count > 0)
// Output is ending and there are trailers to write
// Write any remaining content then write trailers
if (readResult.Buffer.Length > 0)
flushResult = await _frameWriter.WriteDataAsync(readResult.Buffer);
flushResult = await _frameWriter.WriteResponseTrailersAsync(_stream.StreamId, _stream.ResponseTrailers);
else if (readResult.IsCompleted)
if (readResult.Buffer.Length != 0)
// Headers have already been written and there is no other content to write
// Need to complete framewriter immediately as CompleteAsync could be called
// in the app delegate and we don't want to wait for the app delegate to
// finish before sending response.
await _frameWriter.CompleteAsync();
flushResult = default;
flushResult = await _frameWriter.WriteDataAsync(readResult.Buffer);
} while (!readResult.IsCompleted);
catch (OperationCanceledException)
// Writes should not throw for aborted streams/connections.
catch (Exception ex)
_log.LogCritical(ex, nameof(Http3OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected exception.");
await _pipeReader.CompleteAsync();
return flushResult;
static void ThrowUnexpectedState()
throw new InvalidOperationException(nameof(Http3OutputProducer) + "." + nameof(ProcessDataWrites) + " observed an unexpected state where the streams output ended with data still remaining in the pipe.");
private static Pipe CreateDataPipe(MemoryPool<byte> pool)
=> new Pipe(new PipeOptions
pool: pool,
readerScheduler: PipeScheduler.Inline,
writerScheduler: PipeScheduler.ThreadPool,
pauseWriterThreshold: 1,
resumeWriterThreshold: 1,
useSynchronizationContext: false,
minimumSegmentSize: pool.GetMinimumSegmentSize()