File: IngestionWorker.cs
Web Access
Project: src\playground\AzureKusto\AzureKusto.Worker\AzureKusto.Worker.csproj (AzureKusto.Worker)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Data;
using Kusto.Ingest;
using Microsoft.Extensions.Options;
using Polly;
 
namespace AzureKusto.Worker;
 
internal sealed class IngestionWorker : BackgroundService
{
    private readonly IKustoIngestClient _ingestClient;
    private readonly IOptionsMonitor<WorkerOptions> _workerOptions;
    private readonly ResiliencePipeline _pipeline;
    private readonly ILogger<IngestionWorker> _logger;
 
    public IngestionWorker(
        IKustoIngestClient ingestClient,
        IOptionsMonitor<WorkerOptions> workerOptions,
        [FromKeyedServices("kusto-resilience")] ResiliencePipeline pipeline,
        ILogger<IngestionWorker> logger)
    {
        _ingestClient = ingestClient;
        _workerOptions = workerOptions;
        _pipeline = pipeline;
        _logger = logger;
    }
 
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Starting ingestion worker");
 
        // Option: Ingest as part of worker startup
        // This is another approach that is likely more versatile, where seeding occurs as part
        // of startup (optionally only in development).
        await IngestFromDataReaderAsync(stoppingToken);
 
        _logger.LogInformation("Ingestion complete");
        _workerOptions.CurrentValue.IsIngestionComplete = true;
    }
 
    private async Task IngestFromDataReaderAsync(CancellationToken stoppingToken)
    {
        var table = new DataTable();
        table.Columns.Add("Id", typeof(int));
        table.Columns.Add("Name", typeof(string));
 
        table.Rows.Add(7, "George");
        table.Rows.Add(8, "Henry");
        table.Rows.Add(9, "Isabel");
 
        var ingestionProps = new KustoIngestionProperties
        {
            DatabaseName = _workerOptions.CurrentValue.DatabaseName,
            TableName = _workerOptions.CurrentValue.TableName,
        };
 
        await _pipeline.ExecuteAsync(async ct =>
        {
            using var reader = table.CreateDataReader();
            await _ingestClient.IngestFromDataReaderAsync(reader, ingestionProps);
        }, stoppingToken);
    }
}