Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Prometheus historical data collector example
using Axinom.Toolkit;
using NLog;
using Prometheus.Advanced;
using Prometheus.Advanced.DataContracts;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
namespace HistoricalControllingStatsExporter
{
/// <summary>
/// Implements a collector registry that exports large batches of historical data, collecting over and over
/// until there is no more data remaining. Collectors are expected to timestamp every sample.
/// </summary>
sealed class HistoricalDataCollectorRegistry : ICollectorRegistry
{
private readonly Func<bool> _update;
private readonly TimeSpan _timeLimit;
/// <param name="update">
/// Called to update the collectors with new data.
/// If the method returns false, there is no more data to provide and the collection will end.
/// </param>
/// <param name="timeLimit">
/// As much data as fits within this collection time limit will be collected, sending the data
/// once the limit has been exceeded. Should be well below the Prometheus timeout, to allow for processing time.
/// </param>
public HistoricalDataCollectorRegistry(Func<bool> update, TimeSpan timeLimit)
{
Helpers.Argument.ValidateIsNotNull(update, nameof(update));
_update = update;
_timeLimit = timeLimit;
}
private readonly ConcurrentDictionary<string, ICollector> _collectors = new ConcurrentDictionary<string, ICollector>();
public IEnumerable<MetricFamily> CollectAll()
{
var sw = new Stopwatch();
sw.Start();
var result = new List<MetricFamily>();
while (sw.Elapsed < _timeLimit)
{
if (!_update())
{
_log.Debug("Finishing collection because update reported no more data available.");
return result;
}
result.AddRange(_collectors.Values
.Select(c => c.Collect())
.Where(mf => mf != null));
}
_log.Debug("Finishing collection because deadline passed.");
return result;
}
public ICollector GetOrAdd(ICollector collector)
{
var collectorToUse = _collectors.GetOrAdd(collector.Name, collector);
if (!collector.LabelNames.SequenceEqual(collectorToUse.LabelNames))
throw new InvalidOperationException("Collector with same name must have same label names");
return collectorToUse;
}
public bool Remove(ICollector collector)
{
ICollector dummy;
return _collectors.TryRemove(collector.Name, out dummy);
}
// Not thread-safe!
public void SetAllTimestamps(DateTimeOffset timestamp)
{
foreach (var collector in _collectors.Values)
{
var labelledMetricsField = collector.GetType().BaseType.GetField("_labelledMetrics", BindingFlags.Instance | BindingFlags.NonPublic);
var labelledMetrics = labelledMetricsField.GetValue(collector);
var valuesProperty = labelledMetrics.GetType().GetProperty("Values");
var valuesCollection = valuesProperty.GetMethod.Invoke(labelledMetrics, null);
foreach (Child metric in (IEnumerable)valuesCollection)
{
metric.SetTimestamp(timestamp);
}
}
}
private static readonly Logger _log = LogManager.GetLogger("HistoricalDataCollectorRegistry");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment