File: Core\IO\AsyncIOEngine.cs
Web Access
Project: src\src\Servers\IIS\IIS\src\Microsoft.AspNetCore.Server.IIS.csproj (Microsoft.AspNetCore.Server.IIS)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Buffers;
using System.Diagnostics;
 
namespace Microsoft.AspNetCore.Server.IIS.Core.IO;
 
internal sealed partial class AsyncIOEngine : IAsyncIOEngine, IDisposable
{
    private const ushort ResponseMaxChunks = 65533;
 
    private readonly IISHttpContext _context;
    private readonly NativeSafeHandle _handler;
 
    private bool _stopped;
 
    private AsyncIOOperation? _nextOperation;
    private AsyncIOOperation? _runningOperation;
 
    private AsyncReadOperation? _cachedAsyncReadOperation;
    private AsyncWriteOperation? _cachedAsyncWriteOperation;
    private AsyncFlushOperation? _cachedAsyncFlushOperation;
 
    public AsyncIOEngine(IISHttpContext context, NativeSafeHandle handler)
    {
        _context = context;
        _handler = handler;
    }
 
    public ValueTask<int> ReadAsync(Memory<byte> memory)
    {
        var read = GetReadOperation();
        read.Initialize(_handler, memory);
        Run(read);
        return new ValueTask<int>(read, 0);
    }
 
    public ValueTask<int> WriteAsync(ReadOnlySequence<byte> data)
    {
        if (SegmentsOverChunksLimit(data))
        {
            return WriteDataOverChunksLimit(data);
        }
 
        return WriteDataAsync(data);
    }
 
    private ValueTask<int> WriteDataAsync(in ReadOnlySequence<byte> data)
    {
        var write = GetWriteOperation();
        write.Initialize(_handler, data);
        Run(write);
        return new ValueTask<int>(write, 0);
    }
 
    // In case the number of chunks is bigger than responseMaxChunks we need to make multiple calls
    // to the native api https://learn.microsoft.com/iis/web-development-reference/native-code-api-reference/ihttpresponse-writeentitychunks-method
    // Despite the documentation states that feeding the function with more than 65535 chunks will cause the function to throw an exception,
    // it actually seems that 65534 is the maximum number of chunks allowed.
    // Also, there seems to be a problem when slicing a ReadOnlySequence on segment borders tracked here https://github.com/dotnet/runtime/issues/67607
    // That's why we only allow 65533 chunks.
    private async ValueTask<int> WriteDataOverChunksLimit(ReadOnlySequence<byte> data)
    {
        ushort segmentsCount = 0;
        var length = 0;
 
        // Since the result is discarded in the only place it's used (IISHttpContext.WriteBody), we return the last result.
        // If we start using the result there, we should make sure we handle the value correctly here.
        var result = 0;
 
        foreach (var segment in data)
        {
            segmentsCount++;
            length += segment.Length;
 
            if (segmentsCount == ResponseMaxChunks)
            {
                result = await WriteDataAsync(data.Slice(0, length));
 
                data = data.Slice(length);
                segmentsCount = 0;
                length = 0;
            }
        }
 
        if (segmentsCount > 0)
        {
            result = await WriteDataAsync(data);
        }
 
        return result;
    }
 
    private static bool SegmentsOverChunksLimit(in ReadOnlySequence<byte> data)
    {
        if (data.IsSingleSegment)
        {
            return false;
        }
 
        var count = 0;
 
        foreach (var _ in data)
        {
            count++;
 
            if (count > ResponseMaxChunks)
            {
                return true;
            }
        }
 
        return false;
    }
 
    private void Run(AsyncIOOperation ioOperation)
    {
        lock (_context._contextLock)
        {
            if (_stopped)
            {
                // Abort all operation after IO was stopped
                ioOperation.Complete(NativeMethods.ERROR_OPERATION_ABORTED, 0);
                return;
            }
 
            if (_runningOperation != null)
            {
                if (_nextOperation == null)
                {
                    _nextOperation = ioOperation;
 
                    // If there is an active read cancel it
                    if (_runningOperation is AsyncReadOperation)
                    {
                        NativeMethods.HttpTryCancelIO(_handler);
                    }
                }
                else
                {
                    throw new InvalidOperationException("Only one queued operation is allowed");
                }
            }
            else
            {
                // we are just starting operation so there would be no
                // continuation registered
                var completed = ioOperation.Invoke() != null;
 
                // operation went async
                if (!completed)
                {
                    _runningOperation = ioOperation;
                }
            }
        }
    }
 
    public ValueTask FlushAsync(bool moreData)
    {
        var flush = GetFlushOperation();
        flush.Initialize(_handler, moreData);
        Run(flush);
        return new ValueTask(flush, 0);
    }
 
    public void NotifyCompletion(int hr, int bytes)
    {
        AsyncIOOperation.AsyncContinuation continuation;
        AsyncIOOperation.AsyncContinuation? nextContinuation = null;
 
        lock (_context._contextLock)
        {
            Debug.Assert(_runningOperation != null);
 
            continuation = _runningOperation.Complete(hr, bytes);
 
            var next = _nextOperation;
            _nextOperation = null;
            _runningOperation = null;
 
            if (next != null)
            {
                if (_stopped)
                {
                    // Abort next operation if IO is stopped
                    nextContinuation = next.Complete(NativeMethods.ERROR_OPERATION_ABORTED, 0);
                }
                else
                {
                    nextContinuation = next.Invoke();
 
                    // operation went async
                    if (nextContinuation == null)
                    {
                        _runningOperation = next;
                    }
                }
            }
        }
 
        continuation.Invoke();
        nextContinuation?.Invoke();
    }
 
    public void Complete()
    {
        lock (_context._contextLock)
        {
            _stopped = true;
 
            // Should only call CancelIO if the client hasn't disconnected
            if (!_context.ClientDisconnected)
            {
                NativeMethods.HttpTryCancelIO(_handler);
            }
        }
    }
 
    private AsyncReadOperation GetReadOperation() =>
        Interlocked.Exchange(ref _cachedAsyncReadOperation, null) ??
        new AsyncReadOperation(this);
 
    private AsyncWriteOperation GetWriteOperation() =>
        Interlocked.Exchange(ref _cachedAsyncWriteOperation, null) ??
        new AsyncWriteOperation(this);
 
    private AsyncFlushOperation GetFlushOperation() =>
        Interlocked.Exchange(ref _cachedAsyncFlushOperation, null) ??
        new AsyncFlushOperation(this);
 
    private void ReturnOperation(AsyncReadOperation operation)
    {
        Volatile.Write(ref _cachedAsyncReadOperation, operation);
    }
 
    private void ReturnOperation(AsyncWriteOperation operation)
    {
        Volatile.Write(ref _cachedAsyncWriteOperation, operation);
    }
 
    private void ReturnOperation(AsyncFlushOperation operation)
    {
        Volatile.Write(ref _cachedAsyncFlushOperation, operation);
    }
 
    public void Dispose()
    {
        _stopped = true;
    }
}