Skip to content

Instantly share code, notes, and snippets.

@slashmili
Last active September 14, 2021 12:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slashmili/67db483616ad93be3033f95ce7babe92 to your computer and use it in GitHub Desktop.
Save slashmili/67db483616ad93be3033f95ce7babe92 to your computer and use it in GitHub Desktop.
Telemetry Metrics Reporter for Fluxter & InfluxDB
defmodule Metrics.InfluxReporter do
@moduledoc """
A reporter that writes the events in the influx_writer
This Reporter ignores the metric type and simply writes the report to influxdb
This GenServer could be used in a Supervisor like:
children = [
{Metrics.InfluxReporter, metrics: metrics(), influx_writer: &MyApp.Fluxter.write/3}
]
by that it attaches itself to the events described in the metric and report the events
to influxdb. Read more about metrics at https://hexdocs.pm/telemetry_metrics/Telemetry.Metrics.html
This module is based on Telemetry.Metrics.ConsoleReporter which
is released under Apache License 2.0
https://github.com/beam-telemetry/telemetry_metrics/blob/main/lib/telemetry_metrics/console_reporter.ex
"""
use GenServer
require Logger
def start_link(opts) do
server_opts = Keyword.take(opts, [:name])
influx_writer =
opts[:influx_writer] ||
raise ArgumentError, "the :influx_writer option is required by #{inspect(__MODULE__)}"
is_function(influx_writer, 3) ||
raise ArgumentError,
"#{inspect(__MODULE__)} requires :influx_writer to be a function with 3 arity"
metrics =
opts[:metrics] ||
raise ArgumentError, "the :metrics option is required by #{inspect(__MODULE__)}"
GenServer.start_link(__MODULE__, {metrics, influx_writer}, server_opts)
end
@impl true
def init({metrics, influx_writer}) do
Process.flag(:trap_exit, true)
groups = Enum.group_by(metrics, & &1.event_name)
for {event, metrics} <- groups do
id = {__MODULE__, event, self()}
:telemetry.attach(id, event, &handle_event/4, {metrics, influx_writer})
end
{:ok, Map.keys(groups)}
end
@impl true
def terminate(_, events) do
for event <- events do
:telemetry.detach({__MODULE__, event, self()})
end
:ok
end
# This function must follow logics as described:
# https://hexdocs.pm/telemetry_metrics/writing_reporters.html#reacting-to-events
defp handle_event(event_name, measurements, metadata, {metrics, influx_writer}) do
event_name_in_string = Enum.join(event_name, ".")
for %{} = metric <- metrics do
try do
measurement = extract_measurement(metric, measurements, metadata)
tags = extract_tags(metric, metadata)
cond do
is_nil(measurement) ->
:noop
Logger.debug(
"No measurement is detected for #{inspect(metric)}/#{event_name_in_string}"
)
not keep?(metric, metadata) ->
Logger.debug("Measurement for #{event_name_in_string} should not be recorded")
true ->
influx_writer.(
Enum.join(event_name, "."),
Keyword.new(tags),
measurement
)
end
rescue
e ->
Logger.error([
"Could not format metric #{inspect(metric)}\n",
Exception.format(:error, e, __STACKTRACE__)
])
end
end
end
defp keep?(%{keep: nil}, _metadata), do: true
defp keep?(metric, metadata), do: metric.keep.(metadata)
defp extract_measurement(metric, measurements, metadata) do
case metric.measurement do
fun when is_function(fun, 2) -> fun.(measurements, metadata)
fun when is_function(fun, 1) -> fun.(measurements)
key -> measurements[key]
end
end
defp extract_tags(metric, metadata) do
tag_values = metric.tag_values.(metadata)
Map.take(tag_values, metric.tags)
end
end
defmodule Metrics.InfluxReporter do
use ExUnit.Case, async: true
import ExUnit.CaptureLog
alias Metrics.InfluxDbReporter, as: SUT
alias Telemetry.Metrics
describe "start_link/1" do
test "with valid arguments" do
SUT.start_link(metrics: [], influx_writer: fn _, _, _ -> :ok end)
end
test "has a mandetory influx_writer option" do
assert_raise ArgumentError,
"the :influx_writer option is required by Metrics.InfluxDbReporter",
fn ->
SUT.start_link([])
end
end
test "with invalid influx_writer option" do
assert_raise ArgumentError,
"Metrics.InfluxDbReporter requires :influx_writer to be a function with 3 arity",
fn ->
SUT.start_link(influx_writer: :boo)
end
end
test "has a mandetory metrics option" do
assert_raise ArgumentError,
"the :metrics option is required by Metrics.InfluxDbReporter",
fn ->
SUT.start_link(influx_writer: fn _, _, _ -> :ok end)
end
end
end
describe "init/2" do
test "attach to telemetry event once for duplicated metrics with same event name" do
assert :telemetry.list_handlers([:my_event, :request, :stop]) == []
metrics = [
Metrics.summary("my_event.request.stop.duration",
unit: {:native, :millisecond}
),
Metrics.counter("my_event.request.stop.duration",
unit: {:native, :millisecond}
)
]
assert {:ok, _} = SUT.init({metrics, fn _, _, _ -> :ok end})
assert [_] = :telemetry.list_handlers([:my_event, :request, :stop])
end
test "returns non-duplicate list of events" do
metrics = [
Metrics.summary("phoenix.endpoint.stop.duration",
unit: {:native, :millisecond}
),
Metrics.counter("phoenix.endpoint.stop.duration",
unit: {:native, :millisecond}
),
Metrics.counter("phoenix.endpoint.start.time",
unit: {:native, :millisecond}
)
]
assert SUT.init({metrics, fn _, _, _ -> :ok end}) ==
{:ok, [[:phoenix, :endpoint, :start], [:phoenix, :endpoint, :stop]]}
end
end
describe "telemetry callback" do
setup do
test_pid = self()
self_writer = fn event, value, tags -> send(test_pid, {event, value, tags}) end
metrics = [
Metrics.counter("my_event.request.stop.duration", tags: [:my_tag])
]
%{influx_writer: self_writer, metrics: metrics}
end
test "reports an event to influxdb", %{influx_writer: influx_writer, metrics: metrics} do
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer)
:telemetry.execute([:my_event, :request, :stop], %{duration: 1}, %{my_tag: "hello"})
assert_receive {"my_event.request.stop", [my_tag: "hello"], 1}
end
test "does not report when the measurement is nil", %{
influx_writer: influx_writer,
metrics: metrics
} do
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer)
:telemetry.execute([:my_event, :request, :stop], %{}, %{my_tag: "hello"})
refute_received {"my_event.request.stop", [my_tag: "hello"], nil}
end
test "reports only when keep function allows it", %{influx_writer: influx_writer} do
metrics = [
Metrics.counter("my_event.request.stop.duration",
keep: fn metadata -> match?(%{keep_it: true}, metadata) end
)
]
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer)
:telemetry.execute([:my_event, :request, :stop], %{duration: 1}, %{keep_it: false})
refute_received {"my_event.request.stop", [], 1}
:telemetry.execute([:my_event, :request, :stop], %{duration: 1}, %{keep_it: true})
assert_receive {"my_event.request.stop", [], 1}
end
test "reports measurement based on measurement/1 function", %{influx_writer: influx_writer} do
measurement_convertor = fn measurements -> measurements.duration / 60 end
metrics = [
Metrics.counter("my_event.request.stop.duration",
measurement: measurement_convertor
)
]
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer)
:telemetry.execute([:my_event, :request, :stop], %{duration: 120})
assert_receive {"my_event.request.stop", [], 2.0}
end
test "reports measurement based on measurement/2 function", %{influx_writer: influx_writer} do
measurement_multiplier = fn measurements, metadata ->
measurements.duration * metadata.scale
end
metrics = [
Metrics.counter("my_event.request.stop.duration",
measurement: measurement_multiplier
)
]
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer)
:telemetry.execute([:my_event, :request, :stop], %{duration: 120}, %{scale: 1000})
assert_receive {"my_event.request.stop", [], 120_000}
end
test "logs the error when is not able to write to influx_writer", %{metrics: metrics} do
capture_runtime_error = fn ->
faulty_writer = fn _, _, _ -> raise RuntimeError end
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: faulty_writer)
:telemetry.execute([:my_event, :request, :stop], %{duration: 120}, %{scale: 1000})
end
assert capture_log(capture_runtime_error) =~ "RuntimeError"
end
test "tags value", %{influx_writer: influx_writer} do
metrics = [
Metrics.summary("my_client.request.stop", [
{:event_name, [:my_client, :request, :stop]},
{:measurement, :duration},
{:tags, [:http_status_code]},
{:tag_values,
fn meta ->
case meta.response do
{:ok, %{status_code: status_code}} -> %{http_status_code: status_code}
{:error, _} -> %{http_status_code: 0}
end
end}
])
]
assert {:ok, _} = SUT.start_link(metrics: metrics, influx_writer: influx_writer)
:telemetry.execute([:my_client, :request, :stop], %{duration: 120}, %{
response: {:ok, %{status_code: 200}}
})
assert_receive {"my_client.request.stop", [http_status_code: 200], 120}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment