File: ConfluentKafkaMetrics.cs
Web Access
Project: src\src\Components\Aspire.Confluent.Kafka\Aspire.Confluent.Kafka.csproj (Aspire.Confluent.Kafka)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using System.Diagnostics.Metrics;
 
namespace Aspire.Confluent.Kafka;
 
internal sealed class ConfluentKafkaMetrics
{
    private readonly Meter _meter;
 
    public Counter<long> Tx { get; }
    public Counter<long> TxBytes { get; }
    public Counter<long> TxMessages { get; }
    public Counter<long> TxMessageBytes { get; }
    public Counter<long> Rx { get; }
    public Counter<long> RxBytes { get; }
    public Counter<long> RxMessages { get; }
    public Counter<long> RxMessageBytes { get; }
 
    public ConcurrentQueue<Measurement<long>> ReplyQueueMeasurements { get; } = new ConcurrentQueue<Measurement<long>>();
    public ConcurrentQueue<Measurement<long>> MessageCountMeasurements { get; } = new ConcurrentQueue<Measurement<long>>();
    public ConcurrentQueue<Measurement<long>> MessageSizeMeasurements { get; } = new ConcurrentQueue<Measurement<long>>();
 
    public ConfluentKafkaMetrics(IMeterFactory meterFactory)
    {
        _meter = meterFactory.Create(ConfluentKafkaCommon.MeterName);
 
        _meter.CreateObservableGauge(Gauges.ReplyQueue, GetReplyQMeasurements, description: Descriptions.ReplyQueue);
        _meter.CreateObservableGauge(Gauges.MessageCount, GetMessageCountMeasurements, description: Descriptions.MessageCount);
        _meter.CreateObservableGauge(Gauges.MessageSize, GetMessageSizeMeasurements, description: Descriptions.MessageSize);
 
        Tx = _meter.CreateCounter<long>(Counters.Tx, description: Descriptions.Tx);
        TxBytes = _meter.CreateCounter<long>(Counters.TxBytes, description: Descriptions.TxBytes);
        TxMessages = _meter.CreateCounter<long>(Counters.TxMessages, description: Descriptions.TxMessages);
        TxMessageBytes = _meter.CreateCounter<long>(Counters.TxMessageBytes, description: Descriptions.TxMessageBytes);
        Rx = _meter.CreateCounter<long>(Counters.Rx, description: Descriptions.Rx);
        RxBytes = _meter.CreateCounter<long>(Counters.RxBytes, description: Descriptions.RxBytes);
        RxMessages = _meter.CreateCounter<long>(Counters.RxMessages, description: Descriptions.RxMessages);
        RxMessageBytes = _meter.CreateCounter<long>(Counters.RxMessageBytes, description: Descriptions.RxMessageBytes);
    }
 
    public static class Gauges
    {
        public const string ReplyQueue = "messaging.kafka.consumer.queue.message_count";
        public const string MessageCount = "messaging.kafka.producer.queue.message_count";
        public const string MessageSize = "messaging.kafka.producer.queue.size";
    }
 
    public static class Counters
    {
        public const string Tx = "messaging.kafka.network.tx";
        public const string TxBytes = "messaging.kafka.network.transmitted";
        public const string Rx = "messaging.kafka.network.rx";
        public const string RxBytes = "messaging.kafka.network.received";
        public const string TxMessages = "messaging.publish.messages";
        public const string TxMessageBytes = "messaging.kafka.message.transmitted";
        public const string RxMessages = "messaging.receive.messages";
        public const string RxMessageBytes = "messaging.kafka.message.received";
    }
 
    public static class Tags
    {
        public const string ClientId = "messaging.client_id";
        public const string Type = "type";
        public const string Name = "name";
    }
 
    private static class Descriptions
    {
        public const string ReplyQueue = "Number of ops (callbacks, events, etc) waiting in queue for application to serve with rd_kafka_poll()";
        public const string MessageCount = "Current number of messages in producer queues";
        public const string MessageSize = "Current total size of messages in producer queues";
        public const string Tx = "Total number of requests sent to Kafka brokers";
        public const string TxBytes = "Total number of bytes transmitted to Kafka brokers";
        public const string Rx = "Total number of responses received from Kafka brokers";
        public const string RxBytes = "Total number of bytes received from Kafka brokers";
        public const string TxMessages = "Total number of messages transmitted (produced) to Kafka brokers";
        public const string TxMessageBytes = "Total number of message bytes (including framing, such as per-Message framing and MessageSet/batch framing) transmitted to Kafka brokers";
        public const string RxMessages = "Total number of messages consumed, not including ignored messages (due to offset, etc), from Kafka brokers";
        public const string RxMessageBytes = "Total number of message bytes (including framing) received from Kafka brokers";
    }
 
    private IEnumerable<Measurement<long>> GetReplyQMeasurements()
    {
        while (ReplyQueueMeasurements.TryDequeue(out var measurement))
        {
            yield return measurement;
        }
    }
 
    private IEnumerable<Measurement<long>> GetMessageCountMeasurements()
    {
        while (MessageCountMeasurements.TryDequeue(out var measurement))
        {
            yield return measurement;
        }
    }
 
    private IEnumerable<Measurement<long>> GetMessageSizeMeasurements()
    {
        while (MessageSizeMeasurements.TryDequeue(out var measurement))
        {
            yield return measurement;
        }
    }
}