File: src\Vendoring\OpenTelemetry.Instrumentation.ConfluentKafka\InstrumentedProducer.cs
Web Access
Project: src\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj (Aspire.Confluent.Kafka)
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
 
using System.Diagnostics;
using System.Text;
using Confluent.Kafka;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
 
namespace OpenTelemetry.Instrumentation.ConfluentKafka;
 
internal sealed class InstrumentedProducer<TKey, TValue> : IProducer<TKey, TValue>
{
    private readonly TextMapPropagator propagator = Propagators.DefaultTextMapPropagator;
    private readonly IProducer<TKey, TValue> producer;
    private readonly ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> options;
 
    public InstrumentedProducer(
        IProducer<TKey, TValue> producer,
        ConfluentKafkaProducerInstrumentationOptions<TKey, TValue> options)
    {
        this.producer = producer;
        this.options = options;
    }
 
    public Handle Handle => this.producer.Handle;
 
    public string Name => this.producer.Name;
 
    public int AddBrokers(string brokers)
    {
        return this.producer.AddBrokers(brokers);
    }
 
    public void SetSaslCredentials(string username, string password)
    {
        this.producer.SetSaslCredentials(username, password);
    }
 
    public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
        string topic,
        Message<TKey, TValue> message,
        CancellationToken cancellationToken = default)
    {
        DateTimeOffset start = DateTimeOffset.UtcNow;
        using Activity? activity = this.StartPublishActivity(start, topic, message);
        if (activity != null)
        {
            this.InjectActivity(activity, message);
        }
 
        DeliveryResult<TKey, TValue> result;
        string? errorType = null;
        try
        {
            result = await this.producer.ProduceAsync(topic, message, cancellationToken).ConfigureAwait(false);
        }
        catch (ProduceException<TKey, TValue> produceException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException));
 
            throw;
        }
        catch (ArgumentException argumentException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException));
 
            throw;
        }
        finally
        {
            DateTimeOffset end = DateTimeOffset.UtcNow;
            activity?.SetEndTime(end.UtcDateTime);
            TimeSpan duration = end - start;
 
            if (this.options.Metrics)
            {
                RecordPublish(topic, duration, errorType);
            }
        }
 
        return result;
    }
 
    public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
        TopicPartition topicPartition,
        Message<TKey, TValue> message,
        CancellationToken cancellationToken = default)
    {
        DateTimeOffset start = DateTimeOffset.UtcNow;
        using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition);
        if (activity != null)
        {
            this.InjectActivity(activity, message);
        }
 
        DeliveryResult<TKey, TValue> result;
        string? errorType = null;
        try
        {
            result = await this.producer.ProduceAsync(topicPartition, message, cancellationToken).ConfigureAwait(false);
        }
        catch (ProduceException<TKey, TValue> produceException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException));
 
            throw;
        }
        catch (ArgumentException argumentException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException));
 
            throw;
        }
        finally
        {
            DateTimeOffset end = DateTimeOffset.UtcNow;
            activity?.SetEndTime(end.UtcDateTime);
            TimeSpan duration = end - start;
 
            if (this.options.Metrics)
            {
                RecordPublish(topicPartition, duration, errorType);
            }
        }
 
        return result;
    }
 
    public void Produce(string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)
    {
        DateTimeOffset start = DateTimeOffset.UtcNow;
        using Activity? activity = this.StartPublishActivity(start, topic, message);
        if (activity != null)
        {
            this.InjectActivity(activity, message);
        }
 
        string? errorType = null;
        try
        {
            this.producer.Produce(topic, message, deliveryHandler);
        }
        catch (ProduceException<TKey, TValue> produceException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException));
 
            throw;
        }
        catch (ArgumentException argumentException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException));
 
            throw;
        }
        finally
        {
            DateTimeOffset end = DateTimeOffset.UtcNow;
            activity?.SetEndTime(end.UtcDateTime);
            TimeSpan duration = end - start;
 
            if (this.options.Metrics)
            {
                RecordPublish(topic, duration, errorType);
            }
        }
    }
 
    public void Produce(TopicPartition topicPartition, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>>? deliveryHandler = null)
    {
        DateTimeOffset start = DateTimeOffset.UtcNow;
        using Activity? activity = this.StartPublishActivity(start, topicPartition.Topic, message, topicPartition.Partition);
        if (activity != null)
        {
            this.InjectActivity(activity, message);
        }
 
        string? errorType = null;
        try
        {
            this.producer.Produce(topicPartition, message, deliveryHandler);
        }
        catch (ProduceException<TKey, TValue> produceException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatProduceException(produceException));
 
            throw;
        }
        catch (ArgumentException argumentException)
        {
            activity?.SetStatus(ActivityStatusCode.Error);
            activity?.SetTag(SemanticConventions.AttributeErrorType, errorType = FormatArgumentException(argumentException));
 
            throw;
        }
        finally
        {
            DateTimeOffset end = DateTimeOffset.UtcNow;
            activity?.SetEndTime(end.UtcDateTime);
            TimeSpan duration = end - start;
 
            if (this.options.Metrics)
            {
                RecordPublish(topicPartition, duration, errorType);
            }
        }
    }
 
    public int Poll(TimeSpan timeout)
    {
        return this.producer.Poll(timeout);
    }
 
    public int Flush(TimeSpan timeout)
    {
        return this.producer.Flush(timeout);
    }
 
    public void Flush(CancellationToken cancellationToken = default)
    {
        this.producer.Flush(cancellationToken);
    }
 
    public void InitTransactions(TimeSpan timeout)
    {
        this.producer.InitTransactions(timeout);
    }
 
    public void BeginTransaction()
    {
        this.producer.BeginTransaction();
    }
 
    public void CommitTransaction(TimeSpan timeout)
    {
        this.producer.CommitTransaction(timeout);
    }
 
    public void CommitTransaction()
    {
        this.producer.CommitTransaction();
    }
 
    public void AbortTransaction(TimeSpan timeout)
    {
        this.producer.AbortTransaction(timeout);
    }
 
    public void AbortTransaction()
    {
        this.producer.AbortTransaction();
    }
 
    public void SendOffsetsToTransaction(IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout)
    {
        this.producer.SendOffsetsToTransaction(offsets, groupMetadata, timeout);
    }
 
    public void Dispose()
    {
        this.producer.Dispose();
    }
 
    private static string FormatProduceException(ProduceException<TKey, TValue> produceException) =>
        $"ProduceException: {produceException.Error.Code}";
 
    private static string FormatArgumentException(ArgumentException argumentException) =>
        $"ArgumentException: {argumentException.ParamName}";
 
    private static void GetTags(string topic, out TagList tags, int? partition = null, string? errorType = null)
    {
        tags = new TagList()
        {
            new KeyValuePair<string, object?>(
                SemanticConventions.AttributeMessagingOperation,
                ConfluentKafkaCommon.PublishOperationName),
            new KeyValuePair<string, object?>(
                SemanticConventions.AttributeMessagingSystem,
                ConfluentKafkaCommon.KafkaMessagingSystem),
            new KeyValuePair<string, object?>(
                SemanticConventions.AttributeMessagingDestinationName,
                topic),
        };
 
        if (partition is not null)
        {
            tags.Add(
                new KeyValuePair<string, object?>(
                    SemanticConventions.AttributeMessagingKafkaDestinationPartition,
                    partition));
        }
 
        if (errorType is not null)
        {
            tags.Add(
                new KeyValuePair<string, object?>(
                    SemanticConventions.AttributeErrorType,
                    errorType));
        }
    }
 
    private static void RecordPublish(string topic, TimeSpan duration, string? errorType = null)
    {
        GetTags(topic, out var tags, partition: null, errorType);
 
        ConfluentKafkaCommon.PublishMessagesCounter.Add(1, in tags);
        ConfluentKafkaCommon.PublishDurationHistogram.Record(duration.TotalSeconds, in tags);
    }
 
    private static void RecordPublish(TopicPartition topicPartition, TimeSpan duration, string? errorType = null)
    {
        GetTags(topicPartition.Topic, out var tags, partition: topicPartition.Partition, errorType);
 
        ConfluentKafkaCommon.PublishMessagesCounter.Add(1, in tags);
        ConfluentKafkaCommon.PublishDurationHistogram.Record(duration.TotalSeconds, in tags);
    }
 
    private Activity? StartPublishActivity(DateTimeOffset start, string topic, Message<TKey, TValue> message, int? partition = null)
    {
        if (!this.options.Traces)
        {
            return null;
        }
 
        var spanName = string.Concat(topic, " ", ConfluentKafkaCommon.PublishOperationName);
        var activity = ConfluentKafkaCommon.ActivitySource.StartActivity(name: spanName, kind: ActivityKind.Producer, startTime: start);
        if (activity == null)
        {
            return null;
        }
 
        if (activity.IsAllDataRequested)
        {
            activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem);
            activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name);
            activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topic);
            activity.SetTag(SemanticConventions.AttributeMessagingOperation, ConfluentKafkaCommon.PublishOperationName);
 
            if (message.Key != null)
            {
                activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, message.Key);
            }
 
            if (partition is not null)
            {
                activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, partition);
            }
        }
 
        return activity;
    }
 
    private void InjectActivity(Activity? activity, Message<TKey, TValue> message)
    {
        this.propagator.Inject(new PropagationContext(activity?.Context ?? default, Baggage.Current), message, this.InjectTraceContext);
    }
 
    private void InjectTraceContext(Message<TKey, TValue> message, string key, string value)
    {
        message.Headers ??= new Headers();
        message.Headers.Add(key, Encoding.UTF8.GetBytes(value));
    }
}