|
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using System.Diagnostics;
using Confluent.Kafka;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.Trace;
namespace OpenTelemetry.Instrumentation.ConfluentKafka;
internal class InstrumentedConsumer<TKey, TValue> : IConsumer<TKey, TValue>
{
private readonly IConsumer<TKey, TValue> consumer;
private readonly ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options;
public InstrumentedConsumer(IConsumer<TKey, TValue> consumer, ConfluentKafkaConsumerInstrumentationOptions<TKey, TValue> options)
{
this.consumer = consumer;
this.options = options;
}
public Handle Handle => this.consumer.Handle;
public string Name => this.consumer.Name;
public string MemberId => this.consumer.MemberId;
public List<TopicPartition> Assignment => this.consumer.Assignment;
public List<string> Subscription => this.consumer.Subscription;
public IConsumerGroupMetadata ConsumerGroupMetadata => this.consumer.ConsumerGroupMetadata;
public string? GroupId { get; internal set; }
public void Dispose()
{
this.consumer.Dispose();
}
public int AddBrokers(string brokers)
{
return this.consumer.AddBrokers(brokers);
}
public void SetSaslCredentials(string username, string password)
{
this.consumer.SetSaslCredentials(username, password);
}
public ConsumeResult<TKey, TValue>? Consume(int millisecondsTimeout)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
ConsumeResult<TKey, TValue>? result = null;
ConsumeResult consumeResult = default;
string? errorType = null;
try
{
result = this.consumer.Consume(millisecondsTimeout);
consumeResult = ExtractConsumeResult(result);
return result;
}
catch (ConsumeException e)
{
(consumeResult, errorType) = ExtractConsumeResult(e);
throw;
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
if (result is { IsPartitionEOF: false })
{
this.InstrumentConsumption(start, end, consumeResult, errorType);
}
}
}
public ConsumeResult<TKey, TValue>? Consume(CancellationToken cancellationToken = default)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
ConsumeResult<TKey, TValue>? result = null;
ConsumeResult consumeResult = default;
string? errorType = null;
try
{
result = this.consumer.Consume(cancellationToken);
consumeResult = ExtractConsumeResult(result);
return result;
}
catch (ConsumeException e)
{
(consumeResult, errorType) = ExtractConsumeResult(e);
throw;
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
if (result is { IsPartitionEOF: false })
{
this.InstrumentConsumption(start, end, consumeResult, errorType);
}
}
}
public ConsumeResult<TKey, TValue>? Consume(TimeSpan timeout)
{
DateTimeOffset start = DateTimeOffset.UtcNow;
ConsumeResult<TKey, TValue>? result = null;
ConsumeResult consumeResult = default;
string? errorType = null;
try
{
result = this.consumer.Consume(timeout);
consumeResult = ExtractConsumeResult(result);
return result;
}
catch (ConsumeException e)
{
(consumeResult, errorType) = ExtractConsumeResult(e);
throw;
}
finally
{
DateTimeOffset end = DateTimeOffset.UtcNow;
if (result is { IsPartitionEOF: false })
{
this.InstrumentConsumption(start, end, consumeResult, errorType);
}
}
}
public void Subscribe(IEnumerable<string> topics)
{
this.consumer.Subscribe(topics);
}
public void Subscribe(string topic)
{
this.consumer.Subscribe(topic);
}
public void Unsubscribe()
{
this.consumer.Unsubscribe();
}
public void Assign(TopicPartition partition)
{
this.consumer.Assign(partition);
}
public void Assign(TopicPartitionOffset partition)
{
this.consumer.Assign(partition);
}
public void Assign(IEnumerable<TopicPartitionOffset> partitions)
{
this.consumer.Assign(partitions);
}
public void Assign(IEnumerable<TopicPartition> partitions)
{
this.consumer.Assign(partitions);
}
public void IncrementalAssign(IEnumerable<TopicPartitionOffset> partitions)
{
this.consumer.IncrementalAssign(partitions);
}
public void IncrementalAssign(IEnumerable<TopicPartition> partitions)
{
this.consumer.IncrementalAssign(partitions);
}
public void IncrementalUnassign(IEnumerable<TopicPartition> partitions)
{
this.consumer.IncrementalUnassign(partitions);
}
public void Unassign()
{
this.consumer.Unassign();
}
public void StoreOffset(ConsumeResult<TKey, TValue> result)
{
this.consumer.StoreOffset(result);
}
public void StoreOffset(TopicPartitionOffset offset)
{
this.consumer.StoreOffset(offset);
}
public List<TopicPartitionOffset> Commit()
{
return this.consumer.Commit();
}
public void Commit(IEnumerable<TopicPartitionOffset> offsets)
{
this.consumer.Commit(offsets);
}
public void Commit(ConsumeResult<TKey, TValue> result)
{
this.consumer.Commit(result);
}
public void Seek(TopicPartitionOffset tpo)
{
this.consumer.Seek(tpo);
}
public void Pause(IEnumerable<TopicPartition> partitions)
{
this.consumer.Pause(partitions);
}
public void Resume(IEnumerable<TopicPartition> partitions)
{
this.consumer.Resume(partitions);
}
public List<TopicPartitionOffset> Committed(TimeSpan timeout)
{
return this.consumer.Committed(timeout);
}
public List<TopicPartitionOffset> Committed(IEnumerable<TopicPartition> partitions, TimeSpan timeout)
{
return this.consumer.Committed(partitions, timeout);
}
public Offset Position(TopicPartition partition)
{
return this.consumer.Position(partition);
}
public List<TopicPartitionOffset> OffsetsForTimes(IEnumerable<TopicPartitionTimestamp> timestampsToSearch, TimeSpan timeout)
{
return this.consumer.OffsetsForTimes(timestampsToSearch, timeout);
}
public WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
{
return this.consumer.GetWatermarkOffsets(topicPartition);
}
public WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
{
return this.consumer.QueryWatermarkOffsets(topicPartition, timeout);
}
public void Close()
{
this.consumer.Close();
}
private static string FormatConsumeException(ConsumeException consumeException) =>
$"ConsumeException: {consumeException.Error}";
private static ConsumeResult ExtractConsumeResult(ConsumeResult<TKey, TValue> result) => result switch
{
null => new ConsumeResult(null, null),
{ Message: null } => new ConsumeResult(result.TopicPartitionOffset, null),
_ => new ConsumeResult(result.TopicPartitionOffset, result.Message.Headers, result.Message.Key),
};
private static (ConsumeResult ConsumeResult, string ErrorType) ExtractConsumeResult(ConsumeException exception) => exception switch
{
{ ConsumerRecord: null } => (new ConsumeResult(null, null), FormatConsumeException(exception)),
{ ConsumerRecord.Message: null } => (new ConsumeResult(exception.ConsumerRecord.TopicPartitionOffset, null), FormatConsumeException(exception)),
_ => (new ConsumeResult(exception.ConsumerRecord.TopicPartitionOffset, exception.ConsumerRecord.Message.Headers, exception.ConsumerRecord.Message.Key), FormatConsumeException(exception)),
};
private static void GetTags(string topic, out TagList tags, int? partition = null, string? errorType = null)
{
tags = new TagList()
{
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingOperation,
ConfluentKafkaCommon.ReceiveOperationName),
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingSystem,
ConfluentKafkaCommon.KafkaMessagingSystem),
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingDestinationName,
topic),
};
if (partition is not null)
{
tags.Add(
new KeyValuePair<string, object?>(
SemanticConventions.AttributeMessagingKafkaDestinationPartition,
partition));
}
if (errorType is not null)
{
tags.Add(
new KeyValuePair<string, object?>(
SemanticConventions.AttributeErrorType,
errorType));
}
}
private static void RecordReceive(TopicPartition topicPartition, TimeSpan duration, string? errorType = null)
{
GetTags(topicPartition.Topic, out var tags, partition: topicPartition.Partition, errorType);
ConfluentKafkaCommon.ReceiveMessagesCounter.Add(1, in tags);
ConfluentKafkaCommon.ReceiveDurationHistogram.Record(duration.TotalSeconds, in tags);
}
private void InstrumentConsumption(DateTimeOffset startTime, DateTimeOffset endTime, ConsumeResult consumeResult, string? errorType)
{
if (this.options.Traces)
{
PropagationContext propagationContext = consumeResult.Headers != null
? OpenTelemetryConsumeResultExtensions.ExtractPropagationContext(consumeResult.Headers)
: default;
using Activity? activity = this.StartReceiveActivity(propagationContext, startTime, consumeResult.TopicPartitionOffset, consumeResult.Key);
if (activity != null)
{
if (errorType != null)
{
activity.SetStatus(ActivityStatusCode.Error);
if (activity.IsAllDataRequested)
{
activity.SetTag(SemanticConventions.AttributeErrorType, errorType);
}
}
activity.SetEndTime(endTime.UtcDateTime);
}
}
if (this.options.Metrics)
{
TimeSpan duration = endTime - startTime;
RecordReceive(consumeResult.TopicPartitionOffset!.TopicPartition, duration, errorType);
}
}
private Activity? StartReceiveActivity(PropagationContext propagationContext, DateTimeOffset start, TopicPartitionOffset? topicPartitionOffset, object? key)
{
var spanName = string.IsNullOrEmpty(topicPartitionOffset?.Topic)
? ConfluentKafkaCommon.ReceiveOperationName
: string.Concat(topicPartitionOffset!.Topic, " ", ConfluentKafkaCommon.ReceiveOperationName);
ActivityLink[] activityLinks = propagationContext.ActivityContext.IsValid()
? new[] { new ActivityLink(propagationContext.ActivityContext) }
: Array.Empty<ActivityLink>();
Activity? activity = ConfluentKafkaCommon.ActivitySource.StartActivity(spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: start, parentContext: default);
if (activity?.IsAllDataRequested == true)
{
activity.SetTag(SemanticConventions.AttributeMessagingSystem, ConfluentKafkaCommon.KafkaMessagingSystem);
activity.SetTag(SemanticConventions.AttributeMessagingClientId, this.Name);
activity.SetTag(SemanticConventions.AttributeMessagingDestinationName, topicPartitionOffset?.Topic);
activity.SetTag(SemanticConventions.AttributeMessagingKafkaDestinationPartition, topicPartitionOffset?.Partition.Value);
activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageOffset, topicPartitionOffset?.Offset.Value);
activity.SetTag(SemanticConventions.AttributeMessagingKafkaConsumerGroup, this.GroupId);
activity.SetTag(SemanticConventions.AttributeMessagingOperation, ConfluentKafkaCommon.ReceiveOperationName);
if (key != null)
{
activity.SetTag(SemanticConventions.AttributeMessagingKafkaMessageKey, key);
}
}
return activity;
}
private readonly record struct ConsumeResult(
TopicPartitionOffset? TopicPartitionOffset,
Headers? Headers,
object? Key = null)
{
public object? Key { get; } = Key;
public Headers? Headers { get; } = Headers;
public TopicPartitionOffset? TopicPartitionOffset { get; } = TopicPartitionOffset;
}
}
|