<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="9.0.5" />

AggregationManager

sealed class AggregationManager
using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Runtime.Versioning; using System.Security; using System.Threading; namespace System.Diagnostics.Metrics { [System.Runtime.Versioning.UnsupportedOSPlatform("browser")] [SecuritySafeCritical] internal sealed class AggregationManager { public const double MinCollectionTimeSecs = 0.1; private static readonly QuantileAggregation s_defaultHistogramConfig = new QuantileAggregation(0.5, 0.95, 0.99); private readonly List<Predicate<Instrument>> _instrumentConfigFuncs = new List<Predicate<Instrument>>(); private Dictionary<Instrument, bool> _instruments = new Dictionary<Instrument, bool>(); private readonly ConcurrentDictionary<Instrument, InstrumentState> _instrumentStates = new ConcurrentDictionary<Instrument, InstrumentState>(); private readonly CancellationTokenSource _cts = new CancellationTokenSource(); private Thread _collectThread; private readonly MeterListener _listener; private int _currentTimeSeries; private int _currentHistograms; private readonly Action<Instrument, LabeledAggregationStatistics, InstrumentState> _collectMeasurement; private readonly Action<DateTime, DateTime> _beginCollection; private readonly Action<DateTime, DateTime> _endCollection; private readonly Action<Instrument, InstrumentState> _beginInstrumentMeasurements; private readonly Action<Instrument, InstrumentState> _endInstrumentMeasurements; private readonly Action<Instrument, InstrumentState> _instrumentPublished; private readonly Action _initialInstrumentEnumerationComplete; private readonly Action<Exception> _collectionError; private readonly Action _timeSeriesLimitReached; private readonly Action _histogramLimitReached; private readonly Action<Exception> _observableInstrumentCallbackError; public TimeSpan CollectionPeriod { get; set; } public int MaxTimeSeries { get; } public int MaxHistograms { get; } public AggregationManager(int maxTimeSeries, int maxHistograms, Action<Instrument, LabeledAggregationStatistics, InstrumentState> collectMeasurement, Action<DateTime, DateTime> beginCollection, Action<DateTime, DateTime> endCollection, Action<Instrument, InstrumentState> beginInstrumentMeasurements, Action<Instrument, InstrumentState> endInstrumentMeasurements, Action<Instrument, InstrumentState> instrumentPublished, Action initialInstrumentEnumerationComplete, Action<Exception> collectionError, Action timeSeriesLimitReached, Action histogramLimitReached, Action<Exception> observableInstrumentCallbackError) { MaxTimeSeries = maxTimeSeries; MaxHistograms = maxHistograms; _collectMeasurement = collectMeasurement; _beginCollection = beginCollection; _endCollection = endCollection; _beginInstrumentMeasurements = beginInstrumentMeasurements; _endInstrumentMeasurements = endInstrumentMeasurements; _instrumentPublished = instrumentPublished; _initialInstrumentEnumerationComplete = initialInstrumentEnumerationComplete; _collectionError = collectionError; _timeSeriesLimitReached = timeSeriesLimitReached; _histogramLimitReached = histogramLimitReached; _observableInstrumentCallbackError = observableInstrumentCallbackError; _listener = new MeterListener(); MeterListener listener = _listener; listener.InstrumentPublished = (Action<Instrument, MeterListener>)Delegate.Combine(listener.InstrumentPublished, new Action<Instrument, MeterListener>(PublishedInstrument)); MeterListener listener2 = _listener; listener2.MeasurementsCompleted = (Action<Instrument, object>)Delegate.Combine(listener2.MeasurementsCompleted, new Action<Instrument, object>(CompletedMeasurements)); _listener.SetMeasurementEventCallback(delegate(Instrument i, double m, ReadOnlySpan<KeyValuePair<string, object>> l, object c) { ((InstrumentState)c).Update(m, l); }); _listener.SetMeasurementEventCallback(delegate(Instrument i, float m, ReadOnlySpan<KeyValuePair<string, object>> l, object c) { ((InstrumentState)c).Update((double)m, l); }); _listener.SetMeasurementEventCallback(delegate(Instrument i, long m, ReadOnlySpan<KeyValuePair<string, object>> l, object c) { ((InstrumentState)c).Update((double)m, l); }); _listener.SetMeasurementEventCallback(delegate(Instrument i, int m, ReadOnlySpan<KeyValuePair<string, object>> l, object c) { ((InstrumentState)c).Update((double)m, l); }); _listener.SetMeasurementEventCallback(delegate(Instrument i, short m, ReadOnlySpan<KeyValuePair<string, object>> l, object c) { ((InstrumentState)c).Update((double)m, l); }); _listener.SetMeasurementEventCallback(delegate(Instrument i, byte m, ReadOnlySpan<KeyValuePair<string, object>> l, object c) { ((InstrumentState)c).Update((double)(int)m, l); }); _listener.SetMeasurementEventCallback(delegate(Instrument i, decimal m, ReadOnlySpan<KeyValuePair<string, object>> l, object c) { ((InstrumentState)c).Update((double)m, l); }); } public void Include(string meterName) { Include((Instrument i) => i.Meter.Name.Equals(meterName, StringComparison.OrdinalIgnoreCase)); } public void IncludeAll() { Include((Instrument i) => true); } public void IncludePrefix(string meterNamePrefix) { Include((Instrument i) => i.Meter.Name.StartsWith(meterNamePrefix, StringComparison.OrdinalIgnoreCase)); } public void Include(string meterName, string instrumentName) { Include(delegate(Instrument i) { if (i.Meter.Name.Equals(meterName, StringComparison.OrdinalIgnoreCase)) return i.Name.Equals(instrumentName, StringComparison.OrdinalIgnoreCase); return false; }); } private void Include(Predicate<Instrument> instrumentFilter) { lock (this) { _instrumentConfigFuncs.Add(instrumentFilter); } } public AggregationManager SetCollectionPeriod(TimeSpan collectionPeriod) { lock (this) { CollectionPeriod = collectionPeriod; return this; } } private void CompletedMeasurements(Instrument instrument, object cookie) { _instruments.Remove(instrument); _endInstrumentMeasurements(instrument, (InstrumentState)cookie); RemoveInstrumentState(instrument); } private void PublishedInstrument(Instrument instrument, MeterListener _) { InstrumentState instrumentState = GetInstrumentState(instrument); _instrumentPublished(instrument, instrumentState); if (instrumentState != null) { _beginInstrumentMeasurements(instrument, instrumentState); if (!_instruments.ContainsKey(instrument)) { _listener.EnableMeasurementEvents(instrument, instrumentState); _instruments.Add(instrument, true); } } } public void Start() { _collectThread = new Thread((ThreadStart)delegate { CollectWorker(_cts.Token); }); _collectThread.IsBackground = true; _collectThread.Name = "MetricsEventSource CollectWorker"; _collectThread.Start(); _listener.Start(); _initialInstrumentEnumerationComplete(); } public void Update() { using (MeterListener meterListener = new MeterListener()) { MeterListener meterListener2 = meterListener; meterListener2.InstrumentPublished = (Action<Instrument, MeterListener>)Delegate.Combine(meterListener2.InstrumentPublished, new Action<Instrument, MeterListener>(PublishedInstrument)); MeterListener meterListener3 = meterListener; meterListener3.MeasurementsCompleted = (Action<Instrument, object>)Delegate.Combine(meterListener3.MeasurementsCompleted, new Action<Instrument, object>(CompletedMeasurements)); meterListener.Start(); } _initialInstrumentEnumerationComplete(); } private void CollectWorker(CancellationToken cancelToken) { try { double num = -1; TimeSpan timeSpan; lock (this) { timeSpan = CollectionPeriod; num = timeSpan.TotalSeconds; } DateTime utcNow = DateTime.UtcNow; DateTime arg = utcNow; while (!cancelToken.IsCancellationRequested) { DateTime utcNow2 = DateTime.UtcNow; timeSpan = utcNow2 - utcNow; double value = Math.Ceiling(timeSpan.TotalSeconds / num) * num; DateTime dateTime = utcNow.AddSeconds(value); DateTime dateTime2 = arg.AddSeconds(num); if (dateTime <= dateTime2) dateTime = dateTime2; TimeSpan timeout = dateTime - utcNow2; if (cancelToken.WaitHandle.WaitOne(timeout)) break; _beginCollection(arg, dateTime); Collect(); _endCollection(arg, dateTime); arg = dateTime; } } catch (Exception obj) { _collectionError(obj); } } public void Dispose() { _cts.Cancel(); if (_collectThread != null) { _collectThread.Join(); _collectThread = null; } _listener.Dispose(); } private void RemoveInstrumentState(Instrument instrument) { _instrumentStates.TryRemove(instrument, out InstrumentState _); } private InstrumentState GetInstrumentState(Instrument instrument) { if (!_instrumentStates.TryGetValue(instrument, out InstrumentState value)) { lock (this) { foreach (Predicate<Instrument> instrumentConfigFunc in _instrumentConfigFuncs) { if (instrumentConfigFunc(instrument)) { value = BuildInstrumentState(instrument); if (value == null) return value; _instrumentStates.TryAdd(instrument, value); _instrumentStates.TryGetValue(instrument, out value); return value; } } return value; } } return value; } [System.Diagnostics.CodeAnalysis.UnconditionalSuppressMessage("AotAnalysis", "IL3050:RequiresDynamicCode", Justification = "MakeGenericType is creating instances over reference types that works fine in AOT.")] internal InstrumentState BuildInstrumentState(Instrument instrument) { Func<Aggregator> aggregatorFactory = GetAggregatorFactory(instrument); if (aggregatorFactory == null) return null; Type type = aggregatorFactory.GetType().GenericTypeArguments[0]; return (InstrumentState)Activator.CreateInstance(typeof(InstrumentState<>).MakeGenericType(type), aggregatorFactory); } private Func<Aggregator> GetAggregatorFactory(Instrument instrument) { Type type = instrument.GetType(); Type type2 = null; type2 = (type.IsGenericType ? type.GetGenericTypeDefinition() : null); if (type2 == typeof(Counter<>)) return delegate { lock (this) { return CheckTimeSeriesAllowed() ? new CounterAggregator(true) : null; } }; if (type2 == typeof(ObservableCounter<>)) return delegate { lock (this) { return CheckTimeSeriesAllowed() ? new ObservableCounterAggregator(true) : null; } }; if (type2 == typeof(ObservableGauge<>)) return delegate { lock (this) { return CheckTimeSeriesAllowed() ? new LastValue() : null; } }; if (type2 == typeof(Gauge<>)) return delegate { lock (this) { return CheckTimeSeriesAllowed() ? new SynchronousLastValue() : null; } }; if (type2 == typeof(Histogram<>)) return delegate { lock (this) { return (!CheckHistogramAllowed() || !CheckTimeSeriesAllowed()) ? null : new ExponentialHistogramAggregator(s_defaultHistogramConfig); } }; if (type2 == typeof(UpDownCounter<>)) return delegate { lock (this) { return CheckTimeSeriesAllowed() ? new CounterAggregator(false) : null; } }; if (type2 == typeof(ObservableUpDownCounter<>)) return delegate { lock (this) { return CheckTimeSeriesAllowed() ? new ObservableCounterAggregator(false) : null; } }; return null; } private bool CheckTimeSeriesAllowed() { if (_currentTimeSeries < MaxTimeSeries) { _currentTimeSeries++; return true; } if (_currentTimeSeries == MaxTimeSeries) { _currentTimeSeries++; _timeSeriesLimitReached(); return false; } return false; } private bool CheckHistogramAllowed() { if (_currentHistograms < MaxHistograms) { _currentHistograms++; return true; } if (_currentHistograms == MaxHistograms) { _currentHistograms++; _histogramLimitReached(); return false; } return false; } internal void Collect() { try { _listener.RecordObservableInstruments(); } catch (Exception obj) { _observableInstrumentCallbackError(obj); } foreach (KeyValuePair<Instrument, InstrumentState> instrumentState in _instrumentStates) { instrumentState.Value.Collect(instrumentState.Key, delegate(LabeledAggregationStatistics labeledAggStats) { _collectMeasurement(instrumentState.Key, labeledAggStats, instrumentState.Value); }); } } } }