// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.Versioning;
using System.Security;
using System.Threading;
using System.Threading.Tasks;
namespace System.Diagnostics.Metrics
internal sealed class AggregationManager
public const double MinCollectionTimeSecs = 0.1;
private static readonly QuantileAggregation s_defaultHistogramConfig = new QuantileAggregation(new double[] { 0.50, 0.95, 0.99 });
// these fields are modified after construction and accessed on multiple threads, use lock(this) to ensure the data
// is synchronized
private readonly List<Predicate<Instrument>> _instrumentConfigFuncs = new();
public TimeSpan CollectionPeriod { get; private set; }
public int MaxTimeSeries { get; }
public int MaxHistograms { get; }
private Dictionary<Instrument, bool> _instruments = new();
private readonly ConcurrentDictionary<Instrument, InstrumentState> _instrumentStates = new();
private readonly CancellationTokenSource _cts = new();
private Thread? _collectThread;
private readonly MeterListener _listener;
private int _currentTimeSeries;
private int _currentHistograms;
private readonly Action<Instrument, LabeledAggregationStatistics, InstrumentState?> _collectMeasurement;
private readonly Action<DateTime, DateTime> _beginCollection;
private readonly Action<DateTime, DateTime> _endCollection;
private readonly Action<Instrument, InstrumentState> _beginInstrumentMeasurements;
private readonly Action<Instrument, InstrumentState> _endInstrumentMeasurements;
private readonly Action<Instrument, InstrumentState?> _instrumentPublished;
private readonly Action _initialInstrumentEnumerationComplete;
private readonly Action<Exception> _collectionError;
private readonly Action _timeSeriesLimitReached;
private readonly Action _histogramLimitReached;
private readonly Action<Exception> _observableInstrumentCallbackError;
public AggregationManager(
int maxTimeSeries,
int maxHistograms,
Action<Instrument, LabeledAggregationStatistics, InstrumentState?> collectMeasurement,
Action<DateTime, DateTime> beginCollection,
Action<DateTime, DateTime> endCollection,
Action<Instrument, InstrumentState> beginInstrumentMeasurements,
Action<Instrument, InstrumentState> endInstrumentMeasurements,
Action<Instrument, InstrumentState?> instrumentPublished,
Action initialInstrumentEnumerationComplete,
Action<Exception> collectionError,
Action timeSeriesLimitReached,
Action histogramLimitReached,
Action<Exception> observableInstrumentCallbackError)
MaxTimeSeries = maxTimeSeries;
MaxHistograms = maxHistograms;
_collectMeasurement = collectMeasurement;
_beginCollection = beginCollection;
_endCollection = endCollection;
_beginInstrumentMeasurements = beginInstrumentMeasurements;
_endInstrumentMeasurements = endInstrumentMeasurements;
_instrumentPublished = instrumentPublished;
_initialInstrumentEnumerationComplete = initialInstrumentEnumerationComplete;
_collectionError = collectionError;
_timeSeriesLimitReached = timeSeriesLimitReached;
_histogramLimitReached = histogramLimitReached;
_observableInstrumentCallbackError = observableInstrumentCallbackError;
_listener = new MeterListener();
_listener.InstrumentPublished += PublishedInstrument;
_listener.MeasurementsCompleted += CompletedMeasurements;
_listener.SetMeasurementEventCallback<double>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<float>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<long>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<int>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<short>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<byte>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
_listener.SetMeasurementEventCallback<decimal>((i, m, l, c) => ((InstrumentState)c!).Update((double)m, l));
public void Include(string meterName)
Include(i => i.Meter.Name.Equals(meterName, StringComparison.OrdinalIgnoreCase));
public void IncludeAll()
Include(i => true);
public void IncludePrefix(string meterNamePrefix)
Include(i => i.Meter.Name.StartsWith(meterNamePrefix, StringComparison.OrdinalIgnoreCase));
public void Include(string meterName, string instrumentName)
Include(i => i.Meter.Name.Equals(meterName, StringComparison.OrdinalIgnoreCase)
&& i.Name.Equals(instrumentName, StringComparison.OrdinalIgnoreCase));
private void Include(Predicate<Instrument> instrumentFilter)
lock (this)
public AggregationManager SetCollectionPeriod(TimeSpan collectionPeriod)
// The caller, MetricsEventSource, is responsible for enforcing this
Debug.Assert(collectionPeriod.TotalSeconds >= MinCollectionTimeSecs);
lock (this)
CollectionPeriod = collectionPeriod;
return this;
private void CompletedMeasurements(Instrument instrument, object? cookie)
Debug.Assert(cookie is not null);
_endInstrumentMeasurements(instrument, (InstrumentState)cookie);
private void PublishedInstrument(Instrument instrument, MeterListener _)
InstrumentState? state = GetInstrumentState(instrument);
_instrumentPublished(instrument, state);
if (state != null)
_beginInstrumentMeasurements(instrument, state);
#pragma warning disable CA1864 // Prefer the 'IDictionary.TryAdd(TKey, TValue)' method. IDictionary.TryAdd() is not available in one of the builds
if (!_instruments.ContainsKey(instrument))
#pragma warning restore CA1864
// This has side effects that prompt MeasurementsCompleted
// to be called if this is called multiple times on an
// instrument in a shared MetricsEventSource.
_listener.EnableMeasurementEvents(instrument, state);
_instruments.Add(instrument, true);
public void Start()
// if already started or already stopped we can't be started again
Debug.Assert(_collectThread == null && !_cts.IsCancellationRequested);
Debug.Assert(CollectionPeriod.TotalSeconds >= MinCollectionTimeSecs);
// This explicitly uses a Thread and not a Task so that metrics still work
// even when an app is experiencing thread-pool starvation. Although we
// can't make in-proc metrics robust to everything, this is a common enough
// problem in .NET apps that it feels worthwhile to take the precaution.
_collectThread = new Thread(() => CollectWorker(_cts.Token));
_collectThread.IsBackground = true;
_collectThread.Name = "MetricsEventSource CollectWorker";
public void Update()
// Creating (and destroying) a MeterListener to leverage the existing
// mechanisms for enumerating and publishing instruments.
using (MeterListener tempListener = new MeterListener())
tempListener.InstrumentPublished += PublishedInstrument;
tempListener.MeasurementsCompleted += CompletedMeasurements;
private void CollectWorker(CancellationToken cancelToken)
double collectionIntervalSecs = -1;
lock (this)
collectionIntervalSecs = CollectionPeriod.TotalSeconds;
Debug.Assert(collectionIntervalSecs >= MinCollectionTimeSecs);
DateTime startTime = DateTime.UtcNow;
DateTime intervalStartTime = startTime;
while (!cancelToken.IsCancellationRequested)
// intervals end at startTime + X*collectionIntervalSecs. Under normal
// circumstance X increases by 1 each interval, but if the time it
// takes to do collection is very large then we might need to skip
// ahead multiple intervals to catch back up.
DateTime now = DateTime.UtcNow;
double secsSinceStart = (now - startTime).TotalSeconds;
double alignUpSecsSinceStart = Math.Ceiling(secsSinceStart / collectionIntervalSecs) *
DateTime nextIntervalStartTime = startTime.AddSeconds(alignUpSecsSinceStart);
// The delay timer precision isn't exact. We might have a situation
// where in the previous loop iterations intervalStartTime=20.00,
// nextIntervalStartTime=21.00, the timer was supposed to delay for 1s but
// it exited early so we looped around and DateTime.Now=20.99.
// Aligning up from DateTime.Now would give us 21.00 again so we also need to skip
// forward one time interval
DateTime minNextInterval = intervalStartTime.AddSeconds(collectionIntervalSecs);
if (nextIntervalStartTime <= minNextInterval)
nextIntervalStartTime = minNextInterval;
// pause until the interval is complete
TimeSpan delayTime = nextIntervalStartTime - now;
if (cancelToken.WaitHandle.WaitOne(delayTime))
// don't do collection if timer may not have run to completion
// collect statistics for the completed interval
_beginCollection(intervalStartTime, nextIntervalStartTime);
_endCollection(intervalStartTime, nextIntervalStartTime);
intervalStartTime = nextIntervalStartTime;
catch (Exception e)
public void Dispose()
if (_collectThread != null)
_collectThread = null;
private void RemoveInstrumentState(Instrument instrument)
_instrumentStates.TryRemove(instrument, out _);
private InstrumentState? GetInstrumentState(Instrument instrument)
if (!_instrumentStates.TryGetValue(instrument, out InstrumentState? instrumentState))
lock (this) // protect _instrumentConfigFuncs list
foreach (Predicate<Instrument> filter in _instrumentConfigFuncs)
if (filter(instrument))
instrumentState = BuildInstrumentState(instrument);
if (instrumentState != null)
_instrumentStates.TryAdd(instrument, instrumentState);
// I don't think it is possible for the instrument to be removed immediately
// and instrumentState = _instrumentStates[instrument] should work, but writing
// this defensively.
_instrumentStates.TryGetValue(instrument, out instrumentState);
return instrumentState;
[UnconditionalSuppressMessage("AotAnalysis", "IL3050:RequiresDynamicCode",
Justification = "MakeGenericType is creating instances over reference types that works fine in AOT.")]
internal InstrumentState? BuildInstrumentState(Instrument instrument)
Func<Aggregator?>? createAggregatorFunc = GetAggregatorFactory(instrument);
if (createAggregatorFunc == null)
return null;
Type aggregatorType = createAggregatorFunc.GetType().GenericTypeArguments[0];
Type instrumentStateType = typeof(InstrumentState<>).MakeGenericType(aggregatorType);
return (InstrumentState)Activator.CreateInstance(instrumentStateType, createAggregatorFunc)!;
private Func<Aggregator?>? GetAggregatorFactory(Instrument instrument)
Type type = instrument.GetType();
Type? genericDefType = null;
genericDefType = type.IsGenericType ? type.GetGenericTypeDefinition() : null;
if (genericDefType == typeof(Counter<>))
return () =>
lock (this)
return CheckTimeSeriesAllowed() ? new CounterAggregator(isMonotonic: true) : null;
else if (genericDefType == typeof(ObservableCounter<>))
return () =>
lock (this)
return CheckTimeSeriesAllowed() ? new ObservableCounterAggregator(isMonotonic: true) : null;
else if (genericDefType == typeof(ObservableGauge<>))
return () =>
lock (this)
return CheckTimeSeriesAllowed() ? new LastValue() : null;
else if (genericDefType == typeof(Gauge<>))
return () =>
lock (this)
return CheckTimeSeriesAllowed() ? new SynchronousLastValue() : null;
else if (genericDefType == typeof(Histogram<>))
return () =>
lock (this)
// checking currentHistograms first because avoiding unexpected increment of TimeSeries count.
return (!CheckHistogramAllowed() || !CheckTimeSeriesAllowed()) ?
null :
new ExponentialHistogramAggregator(s_defaultHistogramConfig);
else if (genericDefType == typeof(UpDownCounter<>))
return () =>
lock (this)
return CheckTimeSeriesAllowed() ? new CounterAggregator(isMonotonic: false) : null;
else if (genericDefType == typeof(ObservableUpDownCounter<>))
return () =>
lock (this)
return CheckTimeSeriesAllowed() ? new ObservableCounterAggregator(isMonotonic: false) : null;
return null;
private bool CheckTimeSeriesAllowed()
if (_currentTimeSeries < MaxTimeSeries)
return true;
else if (_currentTimeSeries == MaxTimeSeries)
return false;
return false;
private bool CheckHistogramAllowed()
if (_currentHistograms < MaxHistograms)
return true;
else if (_currentHistograms == MaxHistograms)
return false;
return false;
internal void Collect()
catch (Exception e)
foreach (KeyValuePair<Instrument, InstrumentState> kv in _instrumentStates)
kv.Value.Collect(kv.Key, (LabeledAggregationStatistics labeledAggStats) =>
_collectMeasurement(kv.Key, labeledAggStats, kv.Value);