File: AspireKafkaProducerExtensions.cs
Web Access
Project: src\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj (Aspire.Confluent.Kafka)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using Aspire;
using Aspire.Confluent.Kafka;
using Confluent.Kafka;
using HealthChecks.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
 
namespace Microsoft.Extensions.Hosting;
 
/// <summary>
/// Extension methods for connecting to a Kafka broker.
/// </summary>
public static class AspireKafkaProducerExtensions
{
    private const string DefaultConfigSectionName = "Aspire:Confluent:Kafka:Producer";
 
    /// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName)
        => AddKafkaProducerInternal<TKey, TValue>(builder, null, null, connectionName, serviceKey: null);
 
    /// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<KafkaProducerSettings>? configureSettings)
        => AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, null, connectionName, serviceKey: null);
 
    /// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
        => AddKafkaProducerInternal<TKey, TValue>(builder, null, Wrap(configureBuilder), connectionName, serviceKey: null);
 
    /// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
        => AddKafkaProducerInternal<TKey, TValue>(builder, null, configureBuilder, connectionName, serviceKey: null);
 
    /// <inheritdoc cref="AddKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<KafkaProducerSettings>? configureSettings, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
        => AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, Wrap(configureBuilder), connectionName, serviceKey: null);
 
    /// <summary>
    /// Registers <see cref="IProducer{TKey,TValue}"/> as a singleton in the services provided by the <paramref name="builder"/>.
    /// </summary>
    /// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
    /// <param name="connectionName">A name used to retrieve the connection string from the ConnectionStrings configuration section.</param>
    /// <param name="configureSettings">An optional method used for customizing the <see cref="KafkaProducerSettings"/>.</param>
    /// <param name="configureBuilder">A method used for customizing the <see cref="ProducerBuilder{TKey,TValue}"/>.</param>
    /// <remarks>Reads the configuration from "Aspire:Kafka:Producer" section.</remarks>
    public static void AddKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string connectionName, Action<KafkaProducerSettings>? configureSettings, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
       => AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, configureBuilder, connectionName, serviceKey: null);
 
    /// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name)
    {
        ArgumentException.ThrowIfNullOrEmpty(name);
        AddKafkaProducerInternal<TKey, TValue>(builder, null, null, connectionName: name, serviceKey: name);
    }
 
    /// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<KafkaProducerSettings>? configureSettings)
    {
        ArgumentException.ThrowIfNullOrEmpty(name);
        AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, null, connectionName: name, serviceKey: name);
    }
 
    /// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
    {
        ArgumentException.ThrowIfNullOrEmpty(name);
        AddKafkaProducerInternal<TKey, TValue>(builder, null, Wrap(configureBuilder), connectionName: name, serviceKey: name);
    }
 
    /// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
    {
        ArgumentException.ThrowIfNullOrEmpty(name);
        AddKafkaProducerInternal<TKey, TValue>(builder, null, configureBuilder, connectionName: name, serviceKey: name);
    }
 
    /// <inheritdoc cref="AddKeyedKafkaProducer{TKey, TValue}(IHostApplicationBuilder, string, Action{KafkaProducerSettings}?, Action{IServiceProvider, ProducerBuilder{TKey, TValue}}?)"/>
    public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<KafkaProducerSettings>? configureSettings, Action<ProducerBuilder<TKey, TValue>>? configureBuilder)
    {
        ArgumentException.ThrowIfNullOrEmpty(name);
        AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, Wrap(configureBuilder), connectionName: name, serviceKey: name);
    }
 
    /// <summary>
    /// Registers <see cref="IProducer{TKey,TValue}"/> as a keyed singleton for the given <paramref name="name"/> in the services provided by the <paramref name="builder"/>.
    /// </summary>
    /// <param name="builder">The <see cref="IHostApplicationBuilder" /> to read config from and add services to.</param>
    /// <param name="name">The name of the component, which is used as the <see cref="ServiceDescriptor.ServiceKey"/> of the service and also to retrieve the connection string from the ConnectionStrings configuration section.</param>
    /// <param name="configureSettings">An optional method used for customizing the <see cref="KafkaProducerSettings"/>.</param>
    /// <param name="configureBuilder">An optional method used for customizing the <see cref="ProducerBuilder{TKey,TValue}"/>.</param>
    /// <remarks>Reads the configuration from "Aspire:Kafka:Producer:{name}" section.</remarks>
    public static void AddKeyedKafkaProducer<TKey, TValue>(this IHostApplicationBuilder builder, string name, Action<KafkaProducerSettings>? configureSettings, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder)
    {
        ArgumentException.ThrowIfNullOrEmpty(name);
        AddKafkaProducerInternal<TKey, TValue>(builder, configureSettings, configureBuilder, connectionName: name, serviceKey: name);
    }
 
    private static void AddKafkaProducerInternal<TKey, TValue>(
        IHostApplicationBuilder builder,
        Action<KafkaProducerSettings>? configureSettings,
        Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder,
        string connectionName,
        string? serviceKey)
    {
        ArgumentNullException.ThrowIfNull(builder);
 
        var settings = BuildProducerSettings(builder, configureSettings, connectionName);
 
        if (serviceKey is null)
        {
            builder.Services.AddSingleton<ProducerConnectionFactory<TKey, TValue>>(sp => CreateProducerConnectionFactory<TKey, TValue>(sp, configureBuilder, settings));
            builder.Services.AddSingleton<IProducer<TKey, TValue>>(sp => sp.GetRequiredService<ProducerConnectionFactory<TKey, TValue>>().Create());
        }
        else
        {
            builder.Services.AddKeyedSingleton<ProducerConnectionFactory<TKey, TValue>>(serviceKey, (sp, key) => CreateProducerConnectionFactory<TKey, TValue>(sp, configureBuilder, settings));
            builder.Services.AddKeyedSingleton<IProducer<TKey, TValue>>(serviceKey, (sp, key) => sp.GetRequiredKeyedService<ProducerConnectionFactory<TKey, TValue>>(key).Create());
        }
 
        if (!settings.DisableMetrics)
        {
            builder.Services.TryAddSingleton<MetricsChannel>();
            builder.Services.AddHostedService<MetricsService>();
            builder.Services.TryAddSingleton<ConfluentKafkaMetrics>();
            builder.Services.AddOpenTelemetry().WithMetrics(metricBuilderProvider => metricBuilderProvider.AddMeter(ConfluentKafkaCommon.MeterName));
        }
 
        if (!settings.DisableHealthChecks)
        {
            string healthCheckName = serviceKey is null
                ? ConfluentKafkaCommon.ProducerHealthCheckName
                : string.Concat(ConfluentKafkaCommon.KeyedProducerHealthCheckName, connectionName);
 
            builder.Services.TryAddKeyedSingleton<KafkaHealthCheck>(healthCheckName,
                (sp, _) =>
                {
                    var connectionFactory = serviceKey is null
                        ? sp.GetRequiredService<ProducerConnectionFactory<TKey, TValue>>()
                        : sp.GetRequiredKeyedService<ProducerConnectionFactory<TKey, TValue>>(serviceKey);
 
                    var options = new KafkaHealthCheckOptions();
                    options.Configuration = new ProducerConfig(connectionFactory.Config.ToDictionary());
                    options.Configuration.SocketTimeoutMs = 1000;
                    options.Configuration.MessageTimeoutMs = 1000;
                    options.Configuration.StatisticsIntervalMs = 0;
                    return new KafkaHealthCheck(options);
                });
 
            builder.TryAddHealthCheck(new HealthCheckRegistration(healthCheckName,
                sp => sp.GetRequiredKeyedService<KafkaHealthCheck>(healthCheckName),
                failureStatus: default,
                tags: default));
        }
    }
 
    private static ProducerConnectionFactory<TKey, TValue> CreateProducerConnectionFactory<TKey, TValue>(IServiceProvider serviceProvider, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder, KafkaProducerSettings settings)
        => new(CreateProducerBuilder(serviceProvider, configureBuilder, settings), settings.Config);
 
    private static ProducerBuilder<TKey, TValue> CreateProducerBuilder<TKey, TValue>(IServiceProvider serviceProvider, Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? configureBuilder, KafkaProducerSettings settings)
    {
        settings.Validate();
 
        ProducerBuilder<TKey, TValue> builder = new(settings.Config);
        ILogger logger = serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger(ConfluentKafkaCommon.LogCategoryName);
        configureBuilder?.Invoke(serviceProvider, builder);
 
        try
        {
            void OnLog(IProducer<TKey, TValue> _, LogMessage logMessage) =>
               logger.Log((LogLevel)logMessage.LevelAs(LogLevelType.MicrosoftExtensionsLogging), logMessage.Facility?.GetHashCode() ?? 0, logMessage.Message, null, static (value, ex) => value);
 
            builder.SetLogHandler(OnLog);
        }
        catch (InvalidOperationException)
        {
            logger.LogWarning("LogHandler is already set. Skipping... No logs will be written.");
        }
 
        if (!settings.DisableMetrics)
        {
            MetricsChannel channel = serviceProvider.GetRequiredService<MetricsChannel>();
            void OnStatistics(IProducer<TKey, TValue> _, string json)
            {
                if (string.IsNullOrEmpty(json))
                {
                    return;
                }
 
                // StatisticsHandler is called on the producer poll thread, we need to offload the processing
                // to avoid slowing the producer down.
                channel.Writer.TryWrite(json);
            };
 
            try
            {
                builder.SetStatisticsHandler(OnStatistics);
            }
            catch (InvalidOperationException)
            {
                logger.LogWarning("StatisticsHandler is already set. Skipping... No metrics will be exposed.");
            }
        }
        return builder;
    }
 
    private static KafkaProducerSettings BuildProducerSettings(IHostApplicationBuilder builder, Action<KafkaProducerSettings>? configureSettings, string connectionName)
    {
        var configSection = builder.Configuration.GetSection(DefaultConfigSectionName);
        var namedConfigSection = configSection.GetSection(connectionName);
        KafkaProducerSettings settings = new();
        configSection.Bind(settings);
        namedConfigSection.Bind(settings);
 
        // Manually bind the ProducerConfig until https://github.com/dotnet/runtime/issues/96652 is fixed
        configSection.GetSection(nameof(KafkaProducerSettings.Config)).Bind(settings.Config);
        namedConfigSection.GetSection(nameof(KafkaProducerSettings.Config)).Bind(settings.Config);
 
        if (builder.Configuration.GetConnectionString(connectionName) is string connectionString)
        {
            settings.ConnectionString = connectionString;
        }
 
        configureSettings?.Invoke(settings);
 
        settings.Consolidate();
        return settings;
    }
 
    private static Action<IServiceProvider, ProducerBuilder<TKey, TValue>>? Wrap<TKey, TValue>(Action<ProducerBuilder<TKey, TValue>>? action)
    {
        if (action is null)
        {
            return null;
        }
 
        return (_, builder) => action(builder);
    }
}