File: src\Vendoring\OpenTelemetry.Instrumentation.StackExchangeRedis\StackExchangeRedisConnectionInstrumentation.cs
Web Access
Project: src\src\Components\Aspire.StackExchange.Redis\Aspire.StackExchange.Redis.csproj (Aspire.StackExchange.Redis)
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
 
using System.Collections.Concurrent;
using System.Diagnostics;
using OpenTelemetry.Instrumentation.StackExchangeRedis.Implementation;
using OpenTelemetry.Internal;
using OpenTelemetry.Trace;
using StackExchange.Redis;
using StackExchange.Redis.Profiling;
 
namespace OpenTelemetry.Instrumentation.StackExchangeRedis;
 
/// <summary>
/// StackExchange.Redis <see cref="IConnectionMultiplexer"/> instrumentation.
/// </summary>
internal sealed class StackExchangeRedisConnectionInstrumentation : IDisposable
{
    internal const string RedisDatabaseIndexKeyName = "db.redis.database_index";
    internal const string RedisFlagsKeyName = "db.redis.flags";
    internal const string ActivitySourceName = "OpenTelemetry.Instrumentation.StackExchangeRedis";
    internal const string ActivityName = ActivitySourceName + ".Execute";
    internal static readonly Version Version = new Version(1, 0, 0, 13);
    internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version.ToString());
    internal static readonly IEnumerable<KeyValuePair<string, object?>> CreationTags = new[]
    {
        new KeyValuePair<string, object?>(SemanticConventions.AttributeDbSystem, "redis"),
    };
 
    internal readonly ConcurrentDictionary<(ActivityTraceId TraceId, ActivitySpanId SpanId), (Activity Activity, ProfilingSession Session)> Cache
        = new();
 
    private readonly StackExchangeRedisInstrumentationOptions options;
    private readonly EventWaitHandle stopHandle = new(false, EventResetMode.ManualReset);
    private readonly Thread drainThread;
 
    private readonly ProfilingSession defaultSession = new();
 
    /// <summary>
    /// Initializes a new instance of the <see cref="StackExchangeRedisConnectionInstrumentation"/> class.
    /// </summary>
    /// <param name="connection"><see cref="IConnectionMultiplexer"/> to instrument.</param>
    /// <param name="name">Optional name for the connection.</param>
    /// <param name="options">Configuration options for redis instrumentation.</param>
    public StackExchangeRedisConnectionInstrumentation(
        IConnectionMultiplexer connection,
        string? name,
        StackExchangeRedisInstrumentationOptions options)
    {
        Guard.ThrowIfNull(connection);
 
        this.options = options ?? new StackExchangeRedisInstrumentationOptions();
 
        this.drainThread = new Thread(this.DrainEntries)
        {
            Name = string.IsNullOrWhiteSpace(name) ? "OpenTelemetry.Redis" : $"OpenTelemetry.Redis{{{name}}}",
            IsBackground = true,
        };
        this.drainThread.Start();
 
        connection.RegisterProfiler(this.GetProfilerSessionsFactory());
    }
 
    /// <summary>
    /// Returns session for the Redis calls recording.
    /// </summary>
    /// <returns>Session associated with the current span context to record Redis calls.</returns>
    public Func<ProfilingSession?> GetProfilerSessionsFactory()
    {
        return () =>
        {
            if (this.stopHandle.WaitOne(0))
            {
                return null;
            }
 
            var parent = Activity.Current;
 
            // If no parent use the default session.
            if (parent == null || parent.IdFormat != ActivityIdFormat.W3C)
            {
                return this.defaultSession;
            }
 
            // Try to reuse a session for all activities created under the same TraceId+SpanId.
            var cacheKey = (parent.TraceId, parent.SpanId);
            if (!this.Cache.TryGetValue(cacheKey, out var session))
            {
                session = (parent, new ProfilingSession());
                this.Cache.TryAdd(cacheKey, session);
            }
 
            return session.Session;
        };
    }
 
    /// <inheritdoc/>
    public void Dispose()
    {
        this.stopHandle.Set();
        this.drainThread.Join();
 
        this.Flush();
 
        this.stopHandle.Dispose();
    }
 
    internal void Flush()
    {
        RedisProfilerEntryToActivityConverter.DrainSession(null, this.defaultSession.FinishProfiling(), this.options);
 
        foreach (var entry in this.Cache)
        {
            var parent = entry.Value.Activity;
            if (parent.Duration == TimeSpan.Zero)
            {
                // Activity is still running, don't drain.
                continue;
            }
 
            ProfilingSession session = entry.Value.Session;
            RedisProfilerEntryToActivityConverter.DrainSession(parent, session.FinishProfiling(), this.options);
            this.Cache.TryRemove((entry.Key.TraceId, entry.Key.SpanId), out _);
        }
    }
 
    private void DrainEntries(object? state)
    {
        while (true)
        {
            if (this.stopHandle.WaitOne(this.options.FlushInterval))
            {
                break;
            }
 
            this.Flush();
        }
    }
}