File: RemoteHostAssetReader.cs
Web Access
Project: src\src\Workspaces\Remote\Core\Microsoft.CodeAnalysis.Remote.Workspaces.csproj (Microsoft.CodeAnalysis.Remote.Workspaces)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Serialization;
using Roslyn.Utilities;
 
namespace Microsoft.CodeAnalysis.Remote;
 
/// <summary>
/// See <see cref="RemoteHostAssetWriter"/> for an explanation of the wire format we use when communicating assets
/// between the host and our OOP server.  This implements the code for reading assets transmitted over the wire.  <see
/// cref="RemoteHostAssetWriter"/> has the code for writing assets.
/// </summary>
internal readonly struct RemoteHostAssetReader<T, TArg>(
    PipeReader pipeReader,
    Checksum solutionChecksum,
    int objectCount,
    ISerializerService serializer,
    Action<Checksum, T, TArg> callback,
    TArg arg)
{
    private readonly PipeReader _pipeReader = pipeReader;
    private readonly Checksum _solutionChecksum = solutionChecksum;
    private readonly int _objectCount = objectCount;
    private readonly ISerializerService _serializer = serializer;
    private readonly Action<Checksum, T, TArg> _callback = callback;
    private readonly TArg _arg = arg;
 
    public ValueTask ReadDataAsync(CancellationToken cancellationToken)
    {
        // Suppress ExecutionContext flow for asynchronous operations operate on the pipe. In addition to avoiding
        // ExecutionContext allocations, this clears the LogicalCallContext and avoids the need to clone data set by
        // CallContext.LogicalSetData at each yielding await in the task tree.
        //
        // ⚠ DO NOT AWAIT INSIDE THE USING BLOCK LEXICALLY (it's fine to await within the call to
        // ReadDataSuppressedFlowAsync). The Dispose method that restores ExecutionContext flow must run on the same
        // thread where SuppressFlow was originally run.
        using var _ = FlowControlHelper.TrySuppressFlow();
        return ReadDataSuppressedFlowAsync(cancellationToken);
    }
 
    private async ValueTask ReadDataSuppressedFlowAsync(CancellationToken cancellationToken)
    {
        using var pipeReaderStream = _pipeReader.AsStream(leaveOpen: true);
 
        // Get an object reader over the stream.  Note: we do not check the validation bytes here as the stream is
        // currently pointing at header data prior to the object data.  Instead, we will check the validation bytes
        // prior to reading each asset out.
        using var objectReader = ObjectReader.GetReader(pipeReaderStream, leaveOpen: true, checkValidationBytes: false);
 
        // Ensure that no invariants were broken and that both sides of the communication channel are talking about the
        // same pinned solution.
        var responseSolutionChecksum = await ReadChecksumFromPipeReaderAsync(cancellationToken).ConfigureAwait(false);
        Contract.ThrowIfFalse(_solutionChecksum == responseSolutionChecksum);
 
        // Now actually read all the messages we expect to get.
        for (var i = 0; i < _objectCount; i++)
            await ReadSingleMessageAsync(objectReader, cancellationToken).ConfigureAwait(false);
    }
 
    private async ValueTask ReadSingleMessageAsync(ObjectReader objectReader, CancellationToken cancellationToken)
    {
        // For each message, read the sentinel byte and the length of the data chunk we'll be reading.
        var length = await CheckSentinelByteAndReadLengthAsync(cancellationToken).ConfigureAwait(false);
 
        // Now buffer in the rest of the data we need to read.  Because we're reading as much data in as we'll need to
        // consume, all further reading (for this single item) can handle synchronously without worrying about this
        // blocking the reading thread on cross-process pipe io.
        var fillReadResult = await _pipeReader.ReadAtLeastAsync(length, cancellationToken).ConfigureAwait(false);
 
        // Note: we have let the pipe reader know that we're done with 'read at least' call, but that we haven't
        // consumed anything from it yet.  Otherwise it will throw that another read can't start the objectReader
        // reading calls below.
        _pipeReader.AdvanceTo(fillReadResult.Buffer.Start);
 
        // Let the object reader do it's own individual object checking.
        objectReader.CheckValidationBytes();
 
        // Now do the actual read of the data, synchronously, from the buffers that are now in memory within our
        // process.  These reads will move the pipe-reader forward, without causing any blocking on async-io.
        var checksum = Checksum.ReadFrom(objectReader);
        var kind = (WellKnownSynchronizationKind)objectReader.ReadByte();
 
        var asset = _serializer.Deserialize(kind, objectReader, cancellationToken);
        Contract.ThrowIfNull(asset);
        _callback(checksum, (T)asset, _arg);
    }
 
    private async ValueTask<int> CheckSentinelByteAndReadLengthAsync(CancellationToken cancellationToken)
    {
        const int HeaderSize = sizeof(byte) + sizeof(int);
 
        var lengthReadResult = await _pipeReader.ReadAtLeastAsync(HeaderSize, cancellationToken).ConfigureAwait(false);
        var (sentinelByte, length) = ReadSentinelAndLength(lengthReadResult);
 
        // Check that the sentinel is correct, and move the pipe reader forward to the end of the header.
        Contract.ThrowIfTrue(sentinelByte != RemoteHostAssetWriter.MessageSentinelByte);
        _pipeReader.AdvanceTo(lengthReadResult.Buffer.GetPosition(HeaderSize));
 
        return length;
    }
 
    // Note on Checksum itself as it depends on SequenceReader, which is provided by nerdbank.streams on netstandard2.0
    // (which the Workspace layer does not depend on).
    private async ValueTask<Checksum> ReadChecksumFromPipeReaderAsync(CancellationToken cancellationToken)
    {
        var readChecksumResult = await _pipeReader.ReadAtLeastAsync(Checksum.HashSize, cancellationToken).ConfigureAwait(false);
 
        var checksum = ReadChecksum(readChecksumResult);
        _pipeReader.AdvanceTo(readChecksumResult.Buffer.GetPosition(Checksum.HashSize));
        return checksum;
    }
 
    private static (byte, int) ReadSentinelAndLength(ReadResult readResult)
    {
        var sequenceReader = new SequenceReader<byte>(readResult.Buffer);
        Contract.ThrowIfFalse(sequenceReader.TryRead(out var sentinel));
        Contract.ThrowIfFalse(sequenceReader.TryReadLittleEndian(out int length));
        return (sentinel, length);
    }
 
    private static Checksum ReadChecksum(ReadResult readResult)
    {
        var sequenceReader = new SequenceReader<byte>(readResult.Buffer);
        Span<byte> checksumBytes = stackalloc byte[Checksum.HashSize];
        Contract.ThrowIfFalse(sequenceReader.TryCopyTo(checksumBytes));
        return Checksum.From(checksumBytes);
    }
}