File: RemoteHostAssetWriter.cs
Web Access
Project: src\src\Workspaces\Remote\Core\Microsoft.CodeAnalysis.Remote.Workspaces.csproj (Microsoft.CodeAnalysis.Remote.Workspaces)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
using System;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.PooledObjects;
using Microsoft.CodeAnalysis.Serialization;
using Microsoft.CodeAnalysis.Shared.Utilities;
using Roslyn.Utilities;
 
namespace Microsoft.CodeAnalysis.Remote;
 
using static SerializableBytes;
using static SolutionAssetStorage;
using ChecksumAndAsset = (Checksum checksum, object asset);
 
/// <summary>
/// Contains the utilities for writing assets from the host to a pipe-writer and for reading those assets on the
/// server.  The format we use is as follows.  For each asset we're writing we write:
/// <code>
/// -------------------------------------------------------------------------
/// | sentinel (1 byte) | length of data (4 bytes) | data (variable length) |
/// -------------------------------------------------------------------------
/// </code>
/// The writing code will write out the sentinel-byte and data-length, ensuring it is flushed to the pipe-writer. This
/// allows the pipe-reader to immediately read that information so it can then pre-allocate the space for the data to go
/// into. After writing the data the writer will also flush, so the reader can then read the data out of the pipe into
/// its buffer.  Once present in the reader's buffer, synchronous deserialization can happen without any sync-over-async
/// blocking on async-io.
/// <para/> The sentinel byte serves to let us detect immediately on the reading side if something has gone wrong with
/// this system.
/// <para/> In order to be able to write out the data-length, the writer will first synchronously write the asset to an
/// in-memory buffer, then write that buffer's length to the pipe-writer, then copy the in-memory buffer to the writer.
/// <para/> When writing/reading the data-segment, we use an the <see cref="ObjectWriter"/>/<see cref="ObjectReader"/>
/// subsystem.  This will write its own validation bits, and then the data describing the asset.  This data is:
/// <code>
/// ----------------------------------------------------------------------------------------------------------
/// | data (variable length)                                                                                 |
/// ----------------------------------------------------------------------------------------------------------
/// | ObjectWriter validation (2 bytes) | checksum (16 bytes) | kind (1 byte) | asset-data (asset specified) |
/// ----------------------------------------------------------------------------------------------------------
/// </code>
/// The validation bytes are followed by the checksum.  The checksum is needed in the message as assets can be found in
/// any order (they are not reported in the order of the array of checksums passed into the writing method). Following
/// this is the kind of the asset.  This kind is used by the reading code to know which asset-deserialization routine to
/// invoke. Finally, the asset data itself is written out.
/// </summary>
internal readonly struct RemoteHostAssetWriter(
    PipeWriter pipeWriter, Scope scope, AssetPath assetPath, ReadOnlyMemory<Checksum> checksums, ISerializerService serializer)
{
    /// <summary>
    /// A sentinel byte we place between messages.  Ensures we can detect when something has gone wrong as soon as
    /// possible. Note: the value we pick is neither ascii nor extended ascii.  So it's very unlikely to appear
    /// accidentally.
    /// </summary>
    public const byte MessageSentinelByte = 0b10010000;
 
    private static readonly ObjectPool<ReadWriteStream> s_streamPool = new(() => new());
 
    private readonly PipeWriter _pipeWriter = pipeWriter;
    private readonly Scope _scope = scope;
    private readonly AssetPath _assetPath = assetPath;
    private readonly ReadOnlyMemory<Checksum> _checksums = checksums;
    private readonly ISerializerService _serializer = serializer;
 
    public Task WriteDataAsync(CancellationToken cancellationToken)
        => ProducerConsumer<ChecksumAndAsset>.RunAsync(
            ProducerConsumerOptions.SingleReaderWriterOptions,
            produceItems: static (onItemFound, @this, cancellationToken) => @this.FindAssetsAsync(onItemFound, cancellationToken),
            consumeItems: static (items, @this, cancellationToken) => @this.WriteBatchToPipeAsync(items, cancellationToken),
            args: this,
            cancellationToken);
 
    private Task FindAssetsAsync(Action<ChecksumAndAsset> onItemFound, CancellationToken cancellationToken)
        => _scope.FindAssetsAsync(
            _assetPath, _checksums,
            static (checksum, asset, onItemFound) => onItemFound((checksum, asset)),
            onItemFound, cancellationToken);
 
    private async Task WriteBatchToPipeAsync(
        IAsyncEnumerable<ChecksumAndAsset> checksumsAndAssets, CancellationToken cancellationToken)
    {
        // Get the in-memory buffer and object-writer we'll use to serialize the assets into.  Don't write any
        // validation bytes at this point in time.  We'll write them between each asset we write out.  Using a single
        // object writer across all assets means we get the benefit of string deduplication across all assets we write
        // out.
        using var pooledStream = s_streamPool.GetPooledObject();
        using var objectWriter = new ObjectWriter(pooledStream.Object, leaveOpen: true, writeValidationBytes: false);
 
        // This information is not actually needed on the receiving end.  However, we still send it so that the receiver
        // can assert that both sides are talking about the same solution snapshot and no weird invariant breaks have
        // occurred.
        _scope.SolutionChecksum.WriteTo(_pipeWriter);
        await _pipeWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
 
        // Keep track of how many checksums we found.  We must find all the checksums we were asked to find.
        var foundChecksumCount = 0;
 
        await foreach (var (checksum, asset) in checksumsAndAssets)
        {
            await WriteSingleAssetToPipeAsync(
                pooledStream.Object, objectWriter, checksum, asset, cancellationToken).ConfigureAwait(false);
            foundChecksumCount++;
        }
 
        cancellationToken.ThrowIfCancellationRequested();
 
        // If we weren't canceled, we better have found and written out all the expected assets.
        Contract.ThrowIfTrue(foundChecksumCount != _checksums.Length);
    }
 
    private async ValueTask WriteSingleAssetToPipeAsync(
        ReadWriteStream tempStream, ObjectWriter objectWriter, Checksum checksum, object asset, CancellationToken cancellationToken)
    {
        Contract.ThrowIfNull(asset);
 
        // We're about to send a message.  Write out our sentinel byte to ensure the reading side can detect problems
        // with our writing.
        WriteSentinelByteToPipeWriter();
 
        // Write the asset to a temporary buffer so we can calculate its length.  Note: as this is an in-memory
        // temporary buffer, we don't have to worry about synchronous writes on it blocking on the pipe-writer. Instead,
        // we'll handle the pipe-writing ourselves afterwards in a completely async fashion.
        WriteAssetToTempStream(tempStream, objectWriter, checksum, asset, cancellationToken);
 
        // Write the length of the asset to the pipe writer so the reader knows how much data to read.
        WriteTempStreamLengthToPipeWriter(tempStream);
 
        // Ensure we flush out the length so the reading side can immediately read the header to determine how much data
        // to it will need to prebuffer.
        await _pipeWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
 
        // Now, asynchronously copy the temp buffer over to the writer stream.
        tempStream.Position = 0;
        await tempStream.CopyToAsync(_pipeWriter, cancellationToken).ConfigureAwait(false);
 
        // We flush after each item as that forms a reasonably sized chunk of data to want to then send over the pipe
        // for the reader on the other side to read.  This allows the item-writing to remain entirely synchronous
        // without any blocking on async flushing, while also ensuring that we're not buffering the entire stream of
        // data into the pipe before it gets sent to the other side.
        await _pipeWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
    }
 
    private void WriteAssetToTempStream(
        ReadWriteStream tempStream, ObjectWriter objectWriter, Checksum checksum, object asset, CancellationToken cancellationToken)
    {
        // Reset the temp stream to the beginning and clear it out. Don't truncate the stream as we're going to be
        // writing to it multiple times.  This will allow us to reuse the internal chunks of the buffer, without having
        // to reallocate them over and over again. Note: this stream internally keeps a list of byte[]s that it writes
        // to.  Each byte[] is less than the LOH size, so there's no concern about LOH fragmentation here.
        tempStream.Position = 0;
        tempStream.SetLength(0, truncate: false);
 
        // Write out the object writer validation bytes.  This will help us detect issues when reading if we've screwed
        // something up.
        objectWriter.WriteValidationBytes();
 
        // Write the checksum for the asset we're writing out, so the other side knows what asset this is.
        checksum.WriteTo(objectWriter);
 
        // Write out the kind so the receiving end knows how to deserialize this asset.
        objectWriter.WriteByte((byte)asset.GetWellKnownSynchronizationKind());
 
        // Now serialize out the asset itself.
        _serializer.Serialize(asset, objectWriter, cancellationToken);
    }
 
    private void WriteSentinelByteToPipeWriter()
    {
        var span = _pipeWriter.GetSpan(1);
        span[0] = MessageSentinelByte;
        _pipeWriter.Advance(1);
    }
 
    private void WriteTempStreamLengthToPipeWriter(ReadWriteStream tempStream)
    {
        var length = tempStream.Length;
        Contract.ThrowIfTrue(length > int.MaxValue);
 
        var span = _pipeWriter.GetSpan(sizeof(int));
        BinaryPrimitives.WriteInt32LittleEndian(span, (int)length);
        _pipeWriter.Advance(sizeof(int));
    }
}