// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Buffers;
using System.Diagnostics;
using System.IO.Pipelines;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
using Microsoft.AspNetCore.Server.Kestrel.Core.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure;
using Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Infrastructure.PipeWriterHelpers;
namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http;
#pragma warning disable CA1852 // Seal internal types
internal class Http1OutputProducer : IHttpOutputProducer, IDisposable
#pragma warning restore CA1852 // Seal internal types
private static ReadOnlySpan<byte> ContinueBytes => "HTTP/1.1 100 Continue\r\n\r\n"u8;
private static ReadOnlySpan<byte> HttpVersion11Bytes => "HTTP/1.1 "u8;
private static ReadOnlySpan<byte> EndHeadersBytes => "\r\n\r\n"u8;
private static ReadOnlySpan<byte> EndChunkedResponseBytes => "0\r\n\r\n"u8;
private const int MaxBeginChunkLength = 10;
private const int EndChunkLength = 2;
private readonly string _connectionId;
private readonly BaseConnectionContext _connectionContext;
private readonly MemoryPool<byte> _memoryPool;
private readonly KestrelTrace _log;
private readonly IHttpMinResponseDataRateFeature _minResponseDataRateFeature;
private readonly ConnectionMetricsContext _connectionMetricsContext;
private readonly IHttpOutputAborter _outputAborter;
private readonly TimingPipeFlusher _flusher;
// This locks access to all of the below fields
private readonly Lock _contextLock = new();
private bool _pipeWriterCompleted;
private bool _aborted;
private long _unflushedBytes;
private int _currentMemoryPrefixBytes;
private readonly ConcurrentPipeWriter _pipeWriter;
private IMemoryOwner<byte>? _fakeMemoryOwner;
private byte[]? _fakeMemory;
// Chunked responses need to be treated uniquely when using GetMemory + Advance.
// We need to know the size of the data written to the chunk before calling Advance on the
// PipeWriter, meaning we internally track how far we have advanced through a current chunk (_advancedBytesForChunk).
// Once write or flush is called, we modify the _currentChunkMemory to prepend the size of data written
// and append the end terminator.
private ResponseBodyMode _responseBodyMode;
private bool _writeStreamSuffixCalled;
private int _advancedBytesForChunk;
private Memory<byte> _currentChunkMemory;
private bool _currentChunkMemoryUpdated;
// Fields needed to store writes before calling either startAsync or Write/FlushAsync
// These should be cleared by the end of the request
private List<CompletedBuffer>? _completedSegments;
private int _completedSegmentsByteCount;
private Memory<byte> _currentSegment;
private IMemoryOwner<byte>? _currentSegmentOwner;
private int _position;
private bool _startCalled;
public Http1OutputProducer(
PipeWriter pipeWriter,
string connectionId,
BaseConnectionContext connectionContext,
MemoryPool<byte> memoryPool,
KestrelTrace log,
ITimeoutControl timeoutControl,
IHttpMinResponseDataRateFeature minResponseDataRateFeature,
ConnectionMetricsContext connectionMetricsContext,
IHttpOutputAborter outputAborter)
// Allow appending more data to the PipeWriter when a flush is pending.
_pipeWriter = new ConcurrentPipeWriter(pipeWriter, memoryPool, _contextLock);
_connectionId = connectionId;
_connectionContext = connectionContext;
_memoryPool = memoryPool;
_log = log;
_minResponseDataRateFeature = minResponseDataRateFeature;
_connectionMetricsContext = connectionMetricsContext;
_outputAborter = outputAborter;
_flusher = new TimingPipeFlusher(timeoutControl, log);
public Task WriteDataAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
if (cancellationToken.IsCancellationRequested)
return Task.FromCanceled(cancellationToken);
return WriteAsync(buffer, cancellationToken).GetAsTask();
public ValueTask<FlushResult> WriteDataToPipeAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken = default)
if (cancellationToken.IsCancellationRequested)
return new ValueTask<FlushResult>(Task.FromCanceled<FlushResult>(cancellationToken));
return WriteAsync(buffer, cancellationToken);
public ValueTask<FlushResult> WriteStreamSuffixAsync()
ValueTask<FlushResult> result = default;
lock (_contextLock)
if (!_writeStreamSuffixCalled)
if (_responseBodyMode == ResponseBodyMode.Chunked)
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
result = WriteAsyncInternal(ref writer, EndChunkedResponseBytes);
else if (_unflushedBytes > 0)
result = FlushAsync();
_writeStreamSuffixCalled = true;
return result;
public ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
lock (_contextLock)
if (_pipeWriterCompleted)
return new ValueTask<FlushResult>(new FlushResult(false, true));
if (_responseBodyMode == ResponseBodyMode.Chunked)
if (_advancedBytesForChunk > 0)
// If there is data that was chunked before flushing (ex someone did GetMemory->Advance->FlushAsync)
// make sure to write whatever was advanced first
return FlushAsyncChunked(this, cancellationToken);
// If there is an empty write, we still need to update the current chunk
_currentChunkMemoryUpdated = false;
var bytesWritten = _unflushedBytes;
_unflushedBytes = 0;
return _flusher.FlushAsync(_minResponseDataRateFeature.MinDataRate, bytesWritten, _outputAborter, cancellationToken);
static ValueTask<FlushResult> FlushAsyncChunked(Http1OutputProducer producer, CancellationToken token)
// Local function so in the common-path the stack space for BufferWriter isn't reserved and cleared when it isn't used.
Debug.Assert(producer._responseBodyMode == ResponseBodyMode.Chunked && producer._advancedBytesForChunk > 0);
var writer = new BufferWriter<PipeWriter>(producer._pipeWriter);
producer.WriteCurrentChunkMemoryToPipeWriter(ref writer);
var bytesWritten = producer._unflushedBytes + writer.BytesCommitted;
producer._unflushedBytes = 0;
// If there is an empty write, we still need to update the current chunk
producer._currentChunkMemoryUpdated = false;
return producer._flusher.FlushAsync(producer._minResponseDataRateFeature.MinDataRate, bytesWritten, producer._outputAborter, token);
public Memory<byte> GetMemory(int sizeHint = 0)
lock (_contextLock)
if (_pipeWriterCompleted)
return GetFakeMemory(sizeHint);
else if (!_startCalled)
return LeasedMemory(sizeHint);
else if (_responseBodyMode == ResponseBodyMode.Chunked)
return GetChunkedMemory(sizeHint);
return _pipeWriter.GetMemory(sizeHint);
public Span<byte> GetSpan(int sizeHint = 0)
lock (_contextLock)
if (_pipeWriterCompleted)
return GetFakeMemory(sizeHint).Span;
else if (!_startCalled)
return LeasedMemory(sizeHint).Span;
else if (_responseBodyMode == ResponseBodyMode.Chunked)
return GetChunkedMemory(sizeHint).Span;
return _pipeWriter.GetMemory(sizeHint).Span;
public void Advance(int bytes)
lock (_contextLock)
if (_pipeWriterCompleted)
if (!_startCalled)
if (bytes >= 0)
if (_currentSegment.Length - bytes < _position)
throw new ArgumentOutOfRangeException(nameof(bytes), "Can't advance past buffer size.");
_position += bytes;
else if (_responseBodyMode == ResponseBodyMode.Chunked)
if (_advancedBytesForChunk > _currentChunkMemory.Length - _currentMemoryPrefixBytes - EndChunkLength - bytes)
throw new ArgumentOutOfRangeException(nameof(bytes), "Can't advance past buffer size.");
_advancedBytesForChunk += bytes;
public long UnflushedBytes
var bytes = _position + _advancedBytesForChunk + _pipeWriter.UnflushedBytes + _completedSegmentsByteCount;
return bytes;
public void CancelPendingFlush()
// This method is for chunked http responses that directly call response.WriteAsync
public ValueTask<FlushResult> WriteChunkAsync(ReadOnlySpan<byte> buffer, CancellationToken cancellationToken)
lock (_contextLock)
if (_pipeWriterCompleted)
return default;
// Make sure any memory used with GetMemory/Advance is written before the chunk
// passed in.
if (_advancedBytesForChunk > 0 || buffer.Length > 0)
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
CommitChunkInternal(ref writer, buffer);
_unflushedBytes += writer.BytesCommitted;
return FlushAsync(cancellationToken);
private void CommitChunkInternal(ref BufferWriter<PipeWriter> writer, ReadOnlySpan<byte> buffer)
if (_advancedBytesForChunk > 0)
WriteCurrentChunkMemoryToPipeWriter(ref writer);
if (buffer.Length > 0)
public void WriteResponseHeaders(int statusCode, string? reasonPhrase, HttpResponseHeaders responseHeaders, ResponseBodyMode responseBodyMode, bool appComplete)
lock (_contextLock)
if (_pipeWriterCompleted)
var buffer = _pipeWriter;
var writer = new BufferWriter<PipeWriter>(buffer);
WriteResponseHeadersInternal(ref writer, statusCode, reasonPhrase, responseHeaders, responseBodyMode);
private void WriteResponseHeadersInternal(ref BufferWriter<PipeWriter> writer, int statusCode, string? reasonPhrase, HttpResponseHeaders responseHeaders, ResponseBodyMode responseBodyMode)
var statusBytes = ReasonPhrases.ToStatusBytes(statusCode, reasonPhrase);
responseHeaders.CopyTo(ref writer);
Debug.Assert(responseBodyMode != ResponseBodyMode.Uninitialized);
_responseBodyMode = responseBodyMode;
WriteDataWrittenBeforeHeaders(ref writer);
_unflushedBytes += writer.BytesCommitted;
_startCalled = true;
private void WriteDataWrittenBeforeHeaders(ref BufferWriter<PipeWriter> writer)
if (_completedSegments != null)
foreach (var segment in _completedSegments)
if (_responseBodyMode == ResponseBodyMode.Chunked)
CommitChunkInternal(ref writer, segment.Span);
else if (_responseBodyMode == ResponseBodyMode.ContentLength)
_completedSegmentsByteCount = 0;
if (!_currentSegment.IsEmpty)
if (_responseBodyMode != ResponseBodyMode.Disabled)
var segment = _currentSegment.Slice(0, _position);
if (_responseBodyMode == ResponseBodyMode.Chunked)
CommitChunkInternal(ref writer, segment.Span);
else if (_responseBodyMode == ResponseBodyMode.ContentLength)
_position = 0;
public void Dispose()
lock (_contextLock)
if (_fakeMemoryOwner != null)
_fakeMemoryOwner = null;
if (_fakeMemory != null)
_fakeMemory = null;
// Call dispose on any memory that wasn't written.
if (_completedSegments != null)
foreach (var segment in _completedSegments)
private void DisposeCurrentSegment()
_currentSegmentOwner = null;
_currentSegment = default;
private void CompletePipe()
if (!_pipeWriterCompleted)
_pipeWriterCompleted = true;
public void Abort(ConnectionAbortedException error, ConnectionEndReason reason)
// Abort can be called after Dispose if there's a flush timeout.
// It's important to still call _lifetimeFeature.Abort() in this case.
lock (_contextLock)
if (_aborted)
KestrelMetrics.AddConnectionEndReason(_connectionMetricsContext, reason);
_aborted = true;
public void Stop()
lock (_contextLock)
public ValueTask<FlushResult> Write100ContinueAsync()
return WriteAsync(ContinueBytes);
public ValueTask<FlushResult> FirstWriteAsync(int statusCode, string? reasonPhrase, HttpResponseHeaders responseHeaders, ResponseBodyMode responseBodyMode, ReadOnlySpan<byte> buffer, CancellationToken cancellationToken)
lock (_contextLock)
if (_pipeWriterCompleted)
return new ValueTask<FlushResult>(new FlushResult(false, true));
// Uses same BufferWriter to write response headers and response
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
WriteResponseHeadersInternal(ref writer, statusCode, reasonPhrase, responseHeaders, responseBodyMode);
return WriteAsyncInternal(ref writer, buffer, cancellationToken);
public ValueTask<FlushResult> FirstWriteChunkedAsync(int statusCode, string? reasonPhrase, HttpResponseHeaders responseHeaders, ResponseBodyMode responseBodyMode, ReadOnlySpan<byte> buffer, CancellationToken cancellationToken)
lock (_contextLock)
if (_pipeWriterCompleted)
return new ValueTask<FlushResult>(new FlushResult(false, true));
// Uses same BufferWriter to write response headers and chunk
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
WriteResponseHeadersInternal(ref writer, statusCode, reasonPhrase, responseHeaders, responseBodyMode);
CommitChunkInternal(ref writer, buffer);
_unflushedBytes += writer.BytesCommitted;
return FlushAsync(cancellationToken);
public void Reset()
Debug.Assert(_currentSegmentOwner == null);
Debug.Assert(_completedSegments == null || _completedSegments.Count == 0);
// Cleared in sequential address ascending order
_currentMemoryPrefixBytes = 0;
_responseBodyMode = ResponseBodyMode.Uninitialized;
_writeStreamSuffixCalled = false;
_currentChunkMemoryUpdated = false;
_startCalled = false;
private ValueTask<FlushResult> WriteAsync(
ReadOnlySpan<byte> buffer,
CancellationToken cancellationToken = default)
lock (_contextLock)
if (_pipeWriterCompleted)
return new ValueTask<FlushResult>(new FlushResult(false, true));
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
return WriteAsyncInternal(ref writer, buffer, cancellationToken);
private ValueTask<FlushResult> WriteAsyncInternal(
ref BufferWriter<PipeWriter> writer,
ReadOnlySpan<byte> buffer,
CancellationToken cancellationToken = default)
if (_responseBodyMode == ResponseBodyMode.Chunked)
if (_advancedBytesForChunk > 0)
// If there is data that was chunked before writing (ex someone did GetMemory->Advance->WriteAsync)
// make sure to write whatever was advanced first
WriteCurrentChunkMemoryToPipeWriter(ref writer);
// If there is an empty write, we still need to update the current chunk
_currentChunkMemoryUpdated = false;
if (buffer.Length > 0)
var bytesWritten = _unflushedBytes + writer.BytesCommitted;
_unflushedBytes = 0;
return _flusher.FlushAsync(
private Memory<byte> GetChunkedMemory(int sizeHint)
if (!_currentChunkMemoryUpdated)
// Calculating ChunkWriter.GetBeginChunkByteCount isn't free, so instead, we can add
// the max length for the prefix and suffix and add it to the sizeHint.
// This still guarantees that the memory passed in will be larger than the sizeHint.
sizeHint += MaxBeginChunkLength + EndChunkLength;
// Check if we need to allocate a new memory.
else if (_advancedBytesForChunk >= _currentChunkMemory.Length - _currentMemoryPrefixBytes - EndChunkLength - sizeHint && _advancedBytesForChunk > 0)
sizeHint += MaxBeginChunkLength + EndChunkLength;
var writer = new BufferWriter<PipeWriter>(_pipeWriter);
WriteCurrentChunkMemoryToPipeWriter(ref writer);
_unflushedBytes += writer.BytesCommitted;
var actualMemory = _currentChunkMemory.Slice(
_currentMemoryPrefixBytes + _advancedBytesForChunk,
_currentChunkMemory.Length - _currentMemoryPrefixBytes - EndChunkLength - _advancedBytesForChunk);
return actualMemory;
private void UpdateCurrentChunkMemory(int sizeHint)
_currentChunkMemory = _pipeWriter.GetMemory(sizeHint);
_currentMemoryPrefixBytes = ChunkWriter.GetPrefixBytesForChunk(_currentChunkMemory.Length, out var sliceOne);
if (sliceOne)
_currentChunkMemory = _currentChunkMemory.Slice(0, _currentChunkMemory.Length - 1);
_currentChunkMemoryUpdated = true;
private void WriteCurrentChunkMemoryToPipeWriter(ref BufferWriter<PipeWriter> writer)
Debug.Assert(_advancedBytesForChunk <= _currentChunkMemory.Length);
Debug.Assert(_advancedBytesForChunk > 0);
var bytesWritten = writer.WriteBeginChunkBytes(_advancedBytesForChunk);
Debug.Assert(bytesWritten <= _currentMemoryPrefixBytes);
if (bytesWritten < _currentMemoryPrefixBytes)
// If the current chunk of memory isn't completely utilized, we need to copy the contents forwards.
// This occurs if someone uses less than 255 bytes of the current Memory segment.
// Therefore, we need to copy it forwards by either 1 or 2 bytes (depending on number of bytes)
_currentChunkMemory.Slice(_currentMemoryPrefixBytes, _advancedBytesForChunk).CopyTo(_currentChunkMemory.Slice(bytesWritten));
_advancedBytesForChunk = 0;
internal Memory<byte> GetFakeMemory(int minSize)
// Try to reuse _fakeMemoryOwner
if (_fakeMemoryOwner != null)
if (_fakeMemoryOwner.Memory.Length < minSize)
_fakeMemoryOwner = null;
return _fakeMemoryOwner.Memory;
// Try to reuse _fakeMemory
if (_fakeMemory != null)
if (_fakeMemory.Length < minSize)
_fakeMemory = null;
return _fakeMemory;
// Requesting a bigger buffer could throw.
if (minSize <= _memoryPool.MaxBufferSize)
// Use the specified pool as it fits.
_fakeMemoryOwner = _memoryPool.Rent(minSize);
return _fakeMemoryOwner.Memory;
// Use the array pool. Its MaxBufferSize is int.MaxValue.
return _fakeMemory = ArrayPool<byte>.Shared.Rent(minSize);
private Memory<byte> LeasedMemory(int sizeHint)
return _currentSegment.Slice(_position);
private void EnsureCapacity(int sizeHint)
// Only subtracts _position from the current segment length if it's non-null.
// If _currentSegment is null, it returns 0.
var remainingSize = _currentSegment.Length - _position;
// If the sizeHint is 0, any capacity will do
// Otherwise, the buffer must have enough space for the entire size hint, or we need to add a segment.
if ((sizeHint == 0 && remainingSize > 0) || (sizeHint > 0 && remainingSize >= sizeHint))
// We have capacity in the current segment
private void AddSegment(int sizeHint = 0)
if (_currentSegment.Length != 0)
// We're adding a segment to the list
if (_completedSegments == null)
_completedSegments = new List<CompletedBuffer>();
// Position might be less than the segment length if there wasn't enough space to satisfy the sizeHint when
// GetMemory was called. In that case we'll take the current segment and call it "completed", but need to
// ignore any empty space in it.
_completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _currentSegment, _position));
_completedSegmentsByteCount += _position;
if (sizeHint <= _memoryPool.MaxBufferSize)
// Get a new buffer using the minimum segment size, unless the size hint is larger than a single segment.
// Also, the size cannot be larger than the MaxBufferSize of the MemoryPool
var owner = _memoryPool.Rent(sizeHint);
_currentSegment = owner.Memory;
_currentSegmentOwner = owner;
else if (sizeHint <= MemoryPool<byte>.Shared.MaxBufferSize)
// fallback to ArrayPool instead of the passed in memory pool (default is PinnedBlockMemoryPool)
// PinnedBlockMemoryPool currently defaults to a low (4k) max buffer size while ArrayPool is 2G
var owner = MemoryPool<byte>.Shared.Rent(sizeHint);
_currentSegment = owner.Memory;
_currentSegmentOwner = owner;
_currentSegment = new byte[sizeHint];
_currentSegmentOwner = null;
_position = 0;
private void ThrowIfSuffixSent()
if (_writeStreamSuffixCalled)
private static void ThrowSuffixSent()
throw new InvalidOperationException("Writing is not allowed after writer was completed.");
/// <summary>
/// Holds a byte[] from the pool and a size value. Basically a Memory but guaranteed to be backed by an ArrayPool byte[], so that we know we can return it.
/// </summary>
private readonly struct CompletedBuffer
private readonly IMemoryOwner<byte>? _memoryOwner;
public Memory<byte> Buffer { get; }
public int Length { get; }
public ReadOnlySpan<byte> Span => Buffer.Span.Slice(0, Length);
public CompletedBuffer(IMemoryOwner<byte>? owner, Memory<byte> buffer, int length)
_memoryOwner = owner;
Buffer = buffer;
Length = length;
public void Return()