File: Model\TelemetryImportService.cs
Web Access
Project: src\src\Aspire.Dashboard\Aspire.Dashboard.csproj (Aspire.Dashboard)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.IO.Compression;
using System.Text.Json;
using Aspire.Dashboard.Configuration;
using Aspire.Dashboard.Otlp.Model;
using Aspire.Dashboard.Otlp.Model.Serialization;
using Aspire.Dashboard.Otlp.Storage;
using Microsoft.Extensions.Options;
 
namespace Aspire.Dashboard.Model;
 
/// <summary>
/// Service for importing telemetry data from files.
/// </summary>
public sealed class TelemetryImportService
{
    private readonly TelemetryRepository _telemetryRepository;
    private readonly IOptionsMonitor<DashboardOptions> _options;
    private readonly ILogger<TelemetryImportService> _logger;
 
    /// <summary>
    /// Gets a value indicating whether import is enabled.
    /// </summary>
    public bool IsImportEnabled => _options.CurrentValue.UI.DisableImport != true;
 
    /// <summary>
    /// Initializes a new instance of the <see cref="TelemetryImportService"/> class.
    /// </summary>
    /// <param name="telemetryRepository">The telemetry repository.</param>
    /// <param name="options">The dashboard options.</param>
    /// <param name="logger">The logger.</param>
    public TelemetryImportService(TelemetryRepository telemetryRepository, IOptionsMonitor<DashboardOptions> options, ILogger<TelemetryImportService> logger)
    {
        _telemetryRepository = telemetryRepository;
        _options = options;
        _logger = logger;
    }
 
    /// <summary>
    /// Imports telemetry data from a file stream.
    /// </summary>
    /// <param name="fileName">The name of the file being imported.</param>
    /// <param name="stream">The file stream.</param>
    /// <param name="cancellationToken">Cancellation token.</param>
    /// <returns>A task representing the async operation.</returns>
    /// <exception cref="InvalidOperationException">Thrown when import is disabled.</exception>
    public async Task ImportAsync(string fileName, Stream stream, CancellationToken cancellationToken)
    {
        if (!IsImportEnabled)
        {
            throw new InvalidOperationException("Import is disabled.");
        }
 
        await ImportCoreAsync(fileName, stream, allowZipFile: true, cancellationToken).ConfigureAwait(false);
    }
 
    private async Task ImportCoreAsync(string fileName, Stream stream, bool allowZipFile, CancellationToken cancellationToken)
    {
        var extension = Path.GetExtension(fileName).ToLowerInvariant();
 
        switch (extension)
        {
            case ".zip":
                if (!allowZipFile)
                {
                    // Allowing zip file is a flag to not extract zip files inside zip files. Avoid unexpected recursion.
                    goto default;
                }
 
                // Copy input stream to MemoryStream because ZipArchive requires synchronous reads and seeking.
                var memoryStream = new MemoryStream();
                await stream.CopyToAsync(memoryStream, cancellationToken).ConfigureAwait(false);
                memoryStream.Position = 0;
 
                await ImportZipAsync(memoryStream, cancellationToken).ConfigureAwait(false);
                break;
            case ".json":
                await ImportJsonAsync(fileName, stream, cancellationToken).ConfigureAwait(false);
                break;
            case ".txt":
                // Text files are console logs - currently ignored as per requirements
                _logger.LogDebug("Ignoring text file {FileName} - console log import not supported", fileName);
                break;
            default:
                _logger.LogDebug("Unsupported file type: {Extension}", extension);
                break;
        }
    }
 
    private async Task ImportZipAsync(Stream stream, CancellationToken cancellationToken)
    {
        using var archive = new ZipArchive(stream, ZipArchiveMode.Read, leaveOpen: true);
 
        foreach (var entry in archive.Entries)
        {
            using var entryStream = entry.Open();
            await ImportCoreAsync(entry.Name, entryStream, allowZipFile: false, cancellationToken).ConfigureAwait(false);
        }
    }
 
    private async Task ImportJsonAsync(string fileName, Stream stream, CancellationToken cancellationToken)
    {
        // Read the JSON content
        using var reader = new StreamReader(stream);
        var jsonContent = await reader.ReadToEndAsync(cancellationToken).ConfigureAwait(false);
 
        if (string.IsNullOrWhiteSpace(jsonContent))
        {
            _logger.LogWarning("Empty JSON file: {FileName}", fileName);
            return;
        }
 
        OtlpTelemetryDataJson? telemetryData;
        try
        {
            telemetryData = JsonSerializer.Deserialize<OtlpTelemetryDataJson>(jsonContent, OtlpJsonSerializerContext.DefaultOptions);
        }
        catch (JsonException ex)
        {
            _logger.LogWarning(ex, "Failed to deserialize JSON file: {FileName}", fileName);
            return;
        }
 
        if (telemetryData is null)
        {
            _logger.LogWarning("Could not deserialize telemetry data from file: {FileName}", fileName);
            return;
        }
 
        var imported = false;
 
        if (telemetryData.ResourceLogs is { Length: > 0 })
        {
            ImportLogs(telemetryData.ResourceLogs);
            _logger.LogDebug("Imported logs from {FileName}", fileName);
            imported = true;
        }
 
        if (telemetryData.ResourceSpans is { Length: > 0 })
        {
            ImportTraces(telemetryData.ResourceSpans);
            _logger.LogDebug("Imported traces from {FileName}", fileName);
            imported = true;
        }
 
        if (telemetryData.ResourceMetrics is { Length: > 0 })
        {
            ImportMetrics(telemetryData.ResourceMetrics);
            _logger.LogDebug("Imported metrics from {FileName}", fileName);
            imported = true;
        }
 
        if (!imported)
        {
            _logger.LogWarning("No telemetry data found in file: {FileName}", fileName);
        }
    }
 
    private void ImportLogs(OtlpResourceLogsJson[] resourceLogs)
    {
        var exportRequest = new OtlpExportLogsServiceRequestJson { ResourceLogs = resourceLogs };
        var protobufRequest = OtlpJsonToProtobufConverter.ToProtobuf(exportRequest);
 
        var addContext = new AddContext();
        _telemetryRepository.AddLogs(addContext, protobufRequest.ResourceLogs);
 
        _logger.LogDebug("Imported logs: {SuccessCount} succeeded, {FailureCount} failed", addContext.SuccessCount, addContext.FailureCount);
    }
 
    private void ImportTraces(OtlpResourceSpansJson[] resourceSpans)
    {
        var exportRequest = new OtlpExportTraceServiceRequestJson { ResourceSpans = resourceSpans };
        var protobufRequest = OtlpJsonToProtobufConverter.ToProtobuf(exportRequest);
 
        var addContext = new AddContext();
        _telemetryRepository.AddTraces(addContext, protobufRequest.ResourceSpans);
 
        _logger.LogDebug("Imported traces: {SuccessCount} succeeded, {FailureCount} failed", addContext.SuccessCount, addContext.FailureCount);
    }
 
    private void ImportMetrics(OtlpResourceMetricsJson[] resourceMetrics)
    {
        var exportRequest = new OtlpExportMetricsServiceRequestJson { ResourceMetrics = resourceMetrics };
        var protobufRequest = OtlpJsonToProtobufConverter.ToProtobuf(exportRequest);
 
        var addContext = new AddContext();
        _telemetryRepository.AddMetrics(addContext, protobufRequest.ResourceMetrics);
 
        _logger.LogDebug("Imported metrics: {SuccessCount} succeeded, {FailureCount} failed", addContext.SuccessCount, addContext.FailureCount);
    }
}