File: System\Diagnostics\Metrics\AggregationManager.cs
Web Access
Project: src\src\libraries\System.Diagnostics.DiagnosticSource\src\System.Diagnostics.DiagnosticSource.csproj (System.Diagnostics.DiagnosticSource)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.Versioning;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
 
namespace System.Diagnostics.Metrics
{
    [UnsupportedOSPlatform("browser")]
    [SecuritySafeCritical]
    internal sealed class AggregationManager
    {
        public const double MinCollectionTimeSecs = 0.1;
        private static readonly QuantileAggregation s_defaultHistogramConfig = new QuantileAggregation(new double[] { 0.50, 0.95, 0.99 });
 
        // these fields are modified after construction and accessed on multiple threads, use lock(this) to ensure the data
        // is synchronized
        private readonly List<Predicate<Instrument>> _instrumentConfigFuncs = new();
        public TimeSpan CollectionPeriod { get; private set; }
 
        public int MaxTimeSeries { get; }
        public int MaxHistograms { get; }
        private Dictionary<Instrument, bool> _instruments = new();
        private readonly ConcurrentDictionary<Instrument, InstrumentState> _instrumentStates = new();
        private readonly CancellationTokenSource _cts = new();
        private Thread? _collectThread;
        private readonly MeterListener _listener;
        private int _currentTimeSeries;
        private int _currentHistograms;
        private readonly Action<Instrument, LabeledAggregationStatistics, InstrumentState?> _collectMeasurement;
        private readonly Action<DateTime, DateTime> _beginCollection;
        private readonly Action<DateTime, DateTime> _endCollection;
        private readonly Action<Instrument, InstrumentState> _beginInstrumentMeasurements;
        private readonly Action<Instrument, InstrumentState> _endInstrumentMeasurements;
        private readonly Action<Instrument, InstrumentState?> _instrumentPublished;
        private readonly Action _initialInstrumentEnumerationComplete;
        private readonly Action<Exception> _collectionError;
        private readonly Action _timeSeriesLimitReached;
        private readonly Action _histogramLimitReached;
        private readonly Action<Exception> _observableInstrumentCallbackError;
 
        public AggregationManager(
            int maxTimeSeries,
            int maxHistograms,
            Action<Instrument, LabeledAggregationStatistics, InstrumentState?> collectMeasurement,
            Action<DateTime, DateTime> beginCollection,
            Action<DateTime, DateTime> endCollection,
            Action<Instrument, InstrumentState> beginInstrumentMeasurements,
            Action<Instrument, InstrumentState> endInstrumentMeasurements,
            Action<Instrument, InstrumentState?> instrumentPublished,
            Action initialInstrumentEnumerationComplete,
            Action<Exception> collectionError,
            Action timeSeriesLimitReached,
            Action histogramLimitReached,
            Action<Exception> observableInstrumentCallbackError)
        {
            MaxTimeSeries = maxTimeSeries;
            MaxHistograms = maxHistograms;
            _collectMeasurement = collectMeasurement;
            _beginCollection = beginCollection;
            _endCollection = endCollection;
            _beginInstrumentMeasurements = beginInstrumentMeasurements;
            _endInstrumentMeasurements = endInstrumentMeasurements;
            _instrumentPublished = instrumentPublished;
            _initialInstrumentEnumerationComplete = initialInstrumentEnumerationComplete;
            _collectionError = collectionError;
            _timeSeriesLimitReached = timeSeriesLimitReached;
            _histogramLimitReached = histogramLimitReached;
            _observableInstrumentCallbackError = observableInstrumentCallbackError;
 
            _listener = new MeterListener();
            _listener.InstrumentPublished += PublishedInstrument;
            _listener.MeasurementsCompleted += CompletedMeasurements;
 
            _listener.SetMeasurementEventCallback<double>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
            _listener.SetMeasurementEventCallback<float>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
            _listener.SetMeasurementEventCallback<long>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
            _listener.SetMeasurementEventCallback<int>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
            _listener.SetMeasurementEventCallback<short>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
            _listener.SetMeasurementEventCallback<byte>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
            _listener.SetMeasurementEventCallback<decimal>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
        }
 
        public void Include(string meterName)
        {
            Include(i => i.Meter.Name.Equals(meterName, StringComparison.OrdinalIgnoreCase));
        }
 
        public void IncludeAll()
        {
            Include(i => true);
        }
 
        public void IncludePrefix(string meterNamePrefix)
        {
            Include(i => i.Meter.Name.StartsWith(meterNamePrefix, StringComparison.OrdinalIgnoreCase));
        }
 
        public void Include(string meterName, string instrumentName)
        {
            Include(i => i.Meter.Name.Equals(meterName, StringComparison.OrdinalIgnoreCase)
                && i.Name.Equals(instrumentName, StringComparison.OrdinalIgnoreCase));
        }
 
        private void Include(Predicate<Instrument> instrumentFilter)
        {
            lock (this)
            {
                _instrumentConfigFuncs.Add(instrumentFilter);
            }
        }
 
        public AggregationManager SetCollectionPeriod(TimeSpan collectionPeriod)
        {
            // The caller, MetricsEventSource, is responsible for enforcing this
            Debug.Assert(collectionPeriod.TotalSeconds >= MinCollectionTimeSecs);
            lock (this)
            {
                CollectionPeriod = collectionPeriod;
            }
            return this;
        }
 
        private void CompletedMeasurements(Instrument instrument, object? cookie)
        {
            _instruments.Remove(instrument);
            Debug.Assert(cookie is not null);
            _endInstrumentMeasurements(instrument, (InstrumentState)cookie);
            RemoveInstrumentState(instrument);
        }
 
        private void PublishedInstrument(Instrument instrument, MeterListener _)
        {
            InstrumentState? state = GetInstrumentState(instrument);
            _instrumentPublished(instrument, state);
            if (state != null)
            {
                _beginInstrumentMeasurements(instrument, state);
#pragma warning disable CA1864 // Prefer the 'IDictionary.TryAdd(TKey, TValue)' method. IDictionary.TryAdd() is not available in one of the builds
                if (!_instruments.ContainsKey(instrument))
#pragma warning restore CA1864
                {
                    // This has side effects that prompt MeasurementsCompleted
                    // to be called if this is called multiple times on an
                    // instrument in a shared MetricsEventSource.
                    _listener.EnableMeasurementEvents(instrument, state);
                    _instruments.Add(instrument, true);
                }
            }
        }
 
        public void Start()
        {
            // if already started or already stopped we can't be started again
            Debug.Assert(_collectThread == null && !_cts.IsCancellationRequested);
            Debug.Assert(CollectionPeriod.TotalSeconds >= MinCollectionTimeSecs);
 
            // This explicitly uses a Thread and not a Task so that metrics still work
            // even when an app is experiencing thread-pool starvation. Although we
            // can't make in-proc metrics robust to everything, this is a common enough
            // problem in .NET apps that it feels worthwhile to take the precaution.
            _collectThread = new Thread(() => CollectWorker(_cts.Token));
            _collectThread.IsBackground = true;
            _collectThread.Name = "MetricsEventSource CollectWorker";
            _collectThread.Start();
 
            _listener.Start();
            _initialInstrumentEnumerationComplete();
        }
 
        public void Update()
        {
            // Creating (and destroying) a MeterListener to leverage the existing
            // mechanisms for enumerating and publishing instruments.
            using (MeterListener tempListener = new MeterListener())
            {
                tempListener.InstrumentPublished += PublishedInstrument;
                tempListener.MeasurementsCompleted += CompletedMeasurements;
                tempListener.Start();
            }
 
            _initialInstrumentEnumerationComplete();
        }
 
        private void CollectWorker(CancellationToken cancelToken)
        {
            try
            {
                double collectionIntervalSecs = -1;
                lock (this)
                {
                    collectionIntervalSecs = CollectionPeriod.TotalSeconds;
                }
                Debug.Assert(collectionIntervalSecs >= MinCollectionTimeSecs);
 
                DateTime startTime = DateTime.UtcNow;
                DateTime intervalStartTime = startTime;
                while (!cancelToken.IsCancellationRequested)
                {
                    // intervals end at startTime + X*collectionIntervalSecs. Under normal
                    // circumstance X increases by 1 each interval, but if the time it
                    // takes to do collection is very large then we might need to skip
                    // ahead multiple intervals to catch back up.
                    //
                    DateTime now = DateTime.UtcNow;
                    double secsSinceStart = (now - startTime).TotalSeconds;
                    double alignUpSecsSinceStart = Math.Ceiling(secsSinceStart / collectionIntervalSecs) *
                        collectionIntervalSecs;
                    DateTime nextIntervalStartTime = startTime.AddSeconds(alignUpSecsSinceStart);
 
                    // The delay timer precision isn't exact. We might have a situation
                    // where in the previous loop iterations intervalStartTime=20.00,
                    // nextIntervalStartTime=21.00, the timer was supposed to delay for 1s but
                    // it exited early so we looped around and DateTime.Now=20.99.
                    // Aligning up from DateTime.Now would give us 21.00 again so we also need to skip
                    // forward one time interval
                    DateTime minNextInterval = intervalStartTime.AddSeconds(collectionIntervalSecs);
                    if (nextIntervalStartTime <= minNextInterval)
                    {
                        nextIntervalStartTime = minNextInterval;
                    }
 
                    // pause until the interval is complete
                    TimeSpan delayTime = nextIntervalStartTime - now;
                    if (cancelToken.WaitHandle.WaitOne(delayTime))
                    {
                        // don't do collection if timer may not have run to completion
                        break;
                    }
 
                    // collect statistics for the completed interval
                    _beginCollection(intervalStartTime, nextIntervalStartTime);
                    Collect();
                    _endCollection(intervalStartTime, nextIntervalStartTime);
                    intervalStartTime = nextIntervalStartTime;
                }
            }
            catch (Exception e)
            {
                _collectionError(e);
            }
        }
 
        public void Dispose()
        {
            _cts.Cancel();
            if (_collectThread != null)
            {
                _collectThread.Join();
                _collectThread = null;
            }
            _listener.Dispose();
        }
 
        private void RemoveInstrumentState(Instrument instrument)
        {
            _instrumentStates.TryRemove(instrument, out _);
        }
 
        private InstrumentState? GetInstrumentState(Instrument instrument)
        {
            if (!_instrumentStates.TryGetValue(instrument, out InstrumentState? instrumentState))
            {
                lock (this) // protect _instrumentConfigFuncs list
                {
                    foreach (Predicate<Instrument> filter in _instrumentConfigFuncs)
                    {
                        if (filter(instrument))
                        {
                            instrumentState = BuildInstrumentState(instrument);
                            if (instrumentState != null)
                            {
                                _instrumentStates.TryAdd(instrument, instrumentState);
                                // I don't think it is possible for the instrument to be removed immediately
                                // and instrumentState = _instrumentStates[instrument] should work, but writing
                                // this defensively.
                                _instrumentStates.TryGetValue(instrument, out instrumentState);
                            }
                            break;
                        }
                    }
                }
            }
            return instrumentState;
        }
 
        [UnconditionalSuppressMessage("AotAnalysis", "IL3050:RequiresDynamicCode",
                        Justification = "MakeGenericType is creating instances over reference types that works fine in AOT.")]
        internal InstrumentState? BuildInstrumentState(Instrument instrument)
        {
            Func<Aggregator?>? createAggregatorFunc = GetAggregatorFactory(instrument);
            if (createAggregatorFunc == null)
            {
                return null;
            }
            Type aggregatorType = createAggregatorFunc.GetType().GenericTypeArguments[0];
            Type instrumentStateType = typeof(InstrumentState<>).MakeGenericType(aggregatorType);
            return (InstrumentState)Activator.CreateInstance(instrumentStateType, createAggregatorFunc)!;
        }
 
        private Func<Aggregator?>? GetAggregatorFactory(Instrument instrument)
        {
            Type type = instrument.GetType();
            Type? genericDefType = null;
            genericDefType = type.IsGenericType ? type.GetGenericTypeDefinition() : null;
            if (genericDefType == typeof(Counter<>))
            {
                return () =>
                {
                    lock (this)
                    {
                        return CheckTimeSeriesAllowed() ? new CounterAggregator(isMonotonic: true) : null;
                    }
                };
            }
            else if (genericDefType == typeof(ObservableCounter<>))
            {
                return () =>
                {
                    lock (this)
                    {
                        return CheckTimeSeriesAllowed() ? new ObservableCounterAggregator(isMonotonic: true) : null;
                    }
                };
            }
            else if (genericDefType == typeof(ObservableGauge<>))
            {
                return () =>
                {
                    lock (this)
                    {
                        return CheckTimeSeriesAllowed() ? new LastValue() : null;
                    }
                };
            }
            else if (genericDefType == typeof(Gauge<>))
            {
                return () =>
                {
                    lock (this)
                    {
                        return CheckTimeSeriesAllowed() ? new SynchronousLastValue() : null;
                    }
                };
            }
            else if (genericDefType == typeof(Histogram<>))
            {
                return () =>
                {
                    lock (this)
                    {
                        // checking currentHistograms first because avoiding unexpected increment of TimeSeries count.
                        return (!CheckHistogramAllowed() || !CheckTimeSeriesAllowed()) ?
                            null :
                            new ExponentialHistogramAggregator(s_defaultHistogramConfig);
                    }
                };
            }
            else if (genericDefType == typeof(UpDownCounter<>))
            {
                return () =>
                {
                    lock (this)
                    {
                        return CheckTimeSeriesAllowed() ? new CounterAggregator(isMonotonic: false) : null;
                    }
                };
            }
            else if (genericDefType == typeof(ObservableUpDownCounter<>))
            {
                return () =>
                {
                    lock (this)
                    {
                        return CheckTimeSeriesAllowed() ? new ObservableCounterAggregator(isMonotonic: false) : null;
                    }
                };
            }
            else
            {
                return null;
            }
        }
 
        private bool CheckTimeSeriesAllowed()
        {
            if (_currentTimeSeries < MaxTimeSeries)
            {
                _currentTimeSeries++;
                return true;
            }
            else if (_currentTimeSeries == MaxTimeSeries)
            {
                _currentTimeSeries++;
                _timeSeriesLimitReached();
                return false;
            }
            else
            {
                return false;
            }
        }
 
        private bool CheckHistogramAllowed()
        {
            if (_currentHistograms < MaxHistograms)
            {
                _currentHistograms++;
                return true;
            }
            else if (_currentHistograms == MaxHistograms)
            {
                _currentHistograms++;
                _histogramLimitReached();
                return false;
            }
            else
            {
                return false;
            }
        }
 
        internal void Collect()
        {
            try
            {
                _listener.RecordObservableInstruments();
            }
            catch (Exception e)
            {
                _observableInstrumentCallbackError(e);
            }
 
            foreach (KeyValuePair<Instrument, InstrumentState> kv in _instrumentStates)
            {
                kv.Value.Collect(kv.Key, (LabeledAggregationStatistics labeledAggStats) =>
                {
                    _collectMeasurement(kv.Key, labeledAggStats, kv.Value);
                });
            }
        }
    }
}