File: Circuits\RemoteJSDataStream.cs
Web Access
Project: src\src\Components\Server\src\Microsoft.AspNetCore.Components.Server.csproj (Microsoft.AspNetCore.Components.Server)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.IO.Pipelines;
using Microsoft.JSInterop;
 
namespace Microsoft.AspNetCore.Components.Server.Circuits;
 
internal sealed class RemoteJSDataStream : Stream
{
    private readonly RemoteJSRuntime _runtime;
    private readonly long _streamId;
    private readonly long _totalLength;
    private readonly int _chunkSize;
    private readonly TimeSpan _jsInteropDefaultCallTimeout;
    private readonly CancellationToken _streamCancellationToken;
    private readonly Stream _pipeReaderStream;
    private readonly Pipe _pipe;
    private long _bytesRead;
    private long _expectedChunkId;
    private DateTimeOffset _lastDataReceivedTime;
    private bool _disposed;
 
    public static async Task<bool> ReceiveData(RemoteJSRuntime runtime, long streamId, long chunkId, byte[] chunk, string error)
    {
        if (!runtime.RemoteJSDataStreamInstances.TryGetValue(streamId, out var instance))
        {
            // There is no data stream with the given identifier. It may have already been disposed.
            // We notify JS that the stream has been cancelled/disposed.
            return false;
        }
 
        return await instance.ReceiveData(chunkId, chunk, error);
    }
 
    public static async ValueTask<RemoteJSDataStream> CreateRemoteJSDataStreamAsync(
        RemoteJSRuntime runtime,
        IJSStreamReference jsStreamReference,
        long totalLength,
        long signalRMaximumIncomingBytes,
        TimeSpan jsInteropDefaultCallTimeout,
        CancellationToken cancellationToken = default)
    {
        // Enforce minimum 1 kb, maximum 50 kb, SignalR message size.
        // We budget 512 bytes overhead for the transfer, thus leaving at least 512 bytes for data
        // transfer per chunk with a 1 kb message size.
        // Additionally, to maintain interactivity, we put an upper limit of 50 kb on the message size.
        var chunkSize = signalRMaximumIncomingBytes > 1024 ?
            (int)Math.Min(signalRMaximumIncomingBytes, 50 * 1024) - 512 :
            throw new ArgumentException($"SignalR MaximumIncomingBytes must be at least 1 kb.");
 
        var streamId = runtime.RemoteJSDataStreamNextInstanceId++;
        var remoteJSDataStream = new RemoteJSDataStream(runtime, streamId, totalLength, chunkSize, jsInteropDefaultCallTimeout, cancellationToken);
        await runtime.InvokeVoidAsync("Blazor._internal.sendJSDataStream", jsStreamReference, streamId, chunkSize);
        return remoteJSDataStream;
    }
 
    private RemoteJSDataStream(
        RemoteJSRuntime runtime,
        long streamId,
        long totalLength,
        int chunkSize,
        TimeSpan jsInteropDefaultCallTimeout,
        CancellationToken cancellationToken)
    {
        _runtime = runtime;
        _streamId = streamId;
        _totalLength = totalLength;
        _chunkSize = chunkSize;
        _jsInteropDefaultCallTimeout = jsInteropDefaultCallTimeout;
        _streamCancellationToken = cancellationToken;
 
        _lastDataReceivedTime = DateTimeOffset.UtcNow;
        _ = ThrowOnTimeout();
 
        _runtime.RemoteJSDataStreamInstances.Add(_streamId, this);
 
        _pipe = new Pipe();
        _pipeReaderStream = _pipe.Reader.AsStream();
        PipeReader = _pipe.Reader;
    }
 
    /// <summary>
    /// Gets a <see cref="PipeReader"/> to directly read data sent by the JavaScript client.
    /// </summary>
    public PipeReader PipeReader { get; }
 
    private async Task<bool> ReceiveData(long chunkId, byte[] chunk, string error)
    {
        try
        {
            if (!string.IsNullOrEmpty(error))
            {
                throw new InvalidOperationException($"An error occurred while reading the remote stream: {error}");
            }
 
            if (chunkId != _expectedChunkId)
            {
                throw new EndOfStreamException($"Out of sequence chunk received, expected {_expectedChunkId}, but received {chunkId}.");
            }
 
            ++_expectedChunkId;
 
            if (chunk.Length == 0)
            {
                throw new EndOfStreamException("The incoming data chunk cannot be empty.");
            }
 
            if (chunk.Length > _chunkSize)
            {
                throw new EndOfStreamException("The incoming data chunk exceeded the permitted length.");
            }
 
            _bytesRead += chunk.Length;
 
            if (_bytesRead > _totalLength)
            {
                throw new EndOfStreamException($"The incoming data stream declared a length {_totalLength}, but {_bytesRead} bytes were sent.");
            }
 
            // Start timeout _after_ performing validations on data.
            _lastDataReceivedTime = DateTimeOffset.UtcNow;
            _ = ThrowOnTimeout();
 
            await _pipe.Writer.WriteAsync(chunk, _streamCancellationToken);
 
            if (_bytesRead == _totalLength)
            {
                await CompletePipeAndDisposeStream();
            }
 
            return true;
        }
        catch (Exception e)
        {
            await CompletePipeAndDisposeStream(e);
 
            // Fatal exception, crush the circuit. A well behaved client
            // should not result in this type of exception.
            if (e is EndOfStreamException)
            {
                throw;
            }
 
            return false;
        }
    }
 
    public override bool CanRead => true;
 
    public override bool CanSeek => false;
 
    public override bool CanWrite => false;
 
    public override long Length => _totalLength;
 
    public override long Position
    {
        get => _pipeReaderStream.Position;
        set => throw new NotSupportedException();
    }
 
    public override void Flush()
    {
        // No-op
    }
 
    public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
 
    public override int Read(byte[] buffer, int offset, int count)
        => throw new NotSupportedException("Synchronous reads are not supported.");
 
    public override long Seek(long offset, SeekOrigin origin)
        => throw new NotSupportedException();
 
    public override void SetLength(long value)
        => throw new NotSupportedException();
 
    public override void Write(byte[] buffer, int offset, int count)
        => throw new NotSupportedException();
 
    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        using var linkedCts = ValueLinkedCancellationTokenSource.Create(_streamCancellationToken, cancellationToken);
        return await _pipeReaderStream.ReadAsync(buffer.AsMemory(offset, count), linkedCts.Token);
    }
 
    public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        using var linkedCts = ValueLinkedCancellationTokenSource.Create(_streamCancellationToken, cancellationToken);
        return await _pipeReaderStream.ReadAsync(buffer, linkedCts.Token);
    }
 
    private async Task ThrowOnTimeout()
    {
        await Task.Delay(_jsInteropDefaultCallTimeout);
 
        if (!_disposed && (DateTimeOffset.UtcNow >= _lastDataReceivedTime.Add(_jsInteropDefaultCallTimeout)))
        {
            // Dispose of the stream if a chunk isn't received within the jsInteropDefaultCallTimeout.
            var timeoutException = new TimeoutException("Did not receive any data in the allotted time.");
            await CompletePipeAndDisposeStream(timeoutException);
            _runtime.RaiseUnhandledException(timeoutException);
        }
    }
 
    /// <summary>
    /// For testing purposes only.
    ///
    /// Triggers the timeout on the next check.
    /// </summary>
    internal void InvalidateLastDataReceivedTimeForTimeout()
    {
        _lastDataReceivedTime = _lastDataReceivedTime.Subtract(_jsInteropDefaultCallTimeout);
    }
 
    private async Task CompletePipeAndDisposeStream(Exception? ex = null)
    {
        await _pipe.Writer.CompleteAsync(ex);
        Dispose(true);
    }
 
    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            _runtime.RemoteJSDataStreamInstances.Remove(_streamId);
        }
 
        _disposed = true;
    }
 
    // A helper for creating and disposing linked CancellationTokenSources
    // without allocating, when possible.
    // Internal for testing.
    internal readonly struct ValueLinkedCancellationTokenSource : IDisposable
    {
        private readonly CancellationTokenSource? _linkedCts;
 
        public readonly CancellationToken Token;
 
        // For testing.
        internal bool HasLinkedCancellationTokenSource => _linkedCts is not null;
 
        public static ValueLinkedCancellationTokenSource Create(
            CancellationToken token1, CancellationToken token2)
        {
            if (!token1.CanBeCanceled)
            {
                return new(linkedCts: null, token2);
            }
 
            if (!token2.CanBeCanceled)
            {
                return new(linkedCts: null, token1);
            }
 
            var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(token1, token2);
            return new(linkedCts, linkedCts.Token);
        }
 
        private ValueLinkedCancellationTokenSource(CancellationTokenSource? linkedCts, CancellationToken token)
        {
            _linkedCts = linkedCts;
            Token = token;
        }
 
        public void Dispose()
        {
            _linkedCts?.Dispose();
        }
    }
}