Skip to content

Instantly share code, notes, and snippets.

@slashdotdash
Last active July 8, 2019 11:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slashdotdash/2dba20ae50d006d8cd1e165769bc2caa to your computer and use it in GitHub Desktop.
Save slashdotdash/2dba20ae50d006d8cd1e165769bc2caa to your computer and use it in GitHub Desktop.
AWS CloudWatch reporter for Commanded telemetry.
defmodule CloudWatchReporter do
use GenServer
require Logger
alias ExAws.Cloudwatch
@namespace "My/App"
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
@impl GenServer
def init(_args) do
Process.flag(:trap_exit, true)
:ok = Environment.init()
events = [
[:vm, :memory],
[:vm, :total_run_queue_lengths],
[:commanded, :command, :dispatch, :start],
[:commanded, :command, :dispatch, :success],
[:commanded, :command, :dispatch, :failure],
[:commanded, :event, :published]
]
:telemetry.attach_many(
"cloudwatch-instrumenter",
events,
&CloudWatchReporter.handle_event/4,
nil
)
enqueue_send_metrics_timer()
{:ok, []}
end
@impl GenServer
def handle_call(:get_queue, _from, queue) do
{:reply, {:ok, queue}, queue}
end
@impl GenServer
def handle_cast({:put_metrics, metrics}, queue) do
queue = Enum.reduce(metrics, queue, fn metric, queue -> [metric | queue] end)
{:noreply, queue}
end
@impl GenServer
def handle_info(:send_metrics, queue) do
do_send_metrics(queue)
enqueue_send_metrics_timer()
{:noreply, []}
end
@impl GenServer
def handle_info(message, queue) do
Logger.info(fn ->
"CloudWatch reporter unexpectedly received message: " <> inspect(message)
end)
{:noreply, queue}
end
# Send any queued metrics on process shutdown.
@impl GenServer
def terminate(_reason, queue) do
do_send_metrics(queue)
queue
end
@memory_metrics %{
"Atom" => :atom,
"Binary" => :binary,
"Code" => :code,
"ETS" => :ets,
"Processes" => :processes,
"System" => :system,
"Total" => :total
}
# See http://erlang.org/doc/man/erlang.html#memory-0
def handle_event([:vm, :memory], measurements, _metadata, _opts) do
hostname = Environment.hostname()
@memory_metrics
|> Enum.reduce([], fn {name, metric}, metrics ->
case Map.get(measurements, metric) do
bytes_allocated when is_integer(bytes_allocated) ->
[
[
metric_name: "VMMemory",
value: bytes_allocated,
unit: "Bytes",
dimensions: %{"Hostname" => hostname, "Memory" => name},
timestamp: DateTime.utc_now()
]
| metrics
]
nil ->
metrics
end
end)
|> put_metrics()
end
@queue_length_metrics %{
"CPU" => :cpu,
"IO" => :io,
"Total" => :total
}
# See https://hexdocs.pm/telemetry_poller/Telemetry.Poller.html#module-run-queue-lengths
def handle_event([:vm, :total_run_queue_lengths], measurements, _metadata, _opts) do
hostname = Environment.hostname()
@queue_length_metrics
|> Enum.reduce([], fn {name, metric}, metrics ->
case Map.get(measurements, metric) do
queue_length when is_integer(queue_length) ->
[
[
metric_name: "VMQueueLength",
value: queue_length,
unit: "Count",
dimensions: %{"Hostname" => hostname, "Queue" => name},
timestamp: DateTime.utc_now()
]
| metrics
]
nil ->
metrics
end
end)
|> put_metrics()
end
def handle_event([:commanded, :command, :dispatch, :start], _measurements, metadata, _opts) do
%{command: command} = metadata
command_name = module_name(command)
put_metrics([
[
metric_name: "CommandDispatch",
value: 1,
unit: "None",
dimensions: %{"Command" => command_name},
timestamp: DateTime.utc_now()
]
])
end
def handle_event([:commanded, :command, :dispatch, :success], measurements, metadata, _opts) do
%{duration: duration} = measurements
%{command: command} = metadata
command_name = module_name(command)
put_metrics([
[
metric_name: "CommandDispatchSuccess",
value: 1,
unit: "None",
dimensions: %{"Command" => command_name},
timestamp: DateTime.utc_now()
],
[
metric_name: "CommandDispatchDuration",
value: duration,
unit: "Microseconds",
dimensions: %{"Command" => command_name},
timestamp: DateTime.utc_now()
]
])
end
def handle_event([:commanded, :command, :dispatch, :failure], _measurements, metadata, _opts) do
%{command: command} = metadata
command_name = module_name(command)
put_metrics([
[
metric_name: "CommandDispatchFailure",
value: 1,
unit: "None",
dimensions: %{"Command" => command_name},
timestamp: DateTime.utc_now()
]
])
end
def handle_event([:commanded, :event, :published], measurements, metadata, _opts) do
%{timestamp: timestamp} = measurements
%{event: event} = metadata
event_name = module_name(event)
timestamp =
case timestamp do
%NaiveDateTime{} = timestamp -> DateTime.from_naive!(timestamp, "Etc/UTC")
%DateTime{} = timestamp -> timestamp
_ -> DateTime.utc_now()
end
put_metrics([
[
metric_name: "EventPublish",
value: 1,
unit: "None",
dimensions: %{"Event" => event_name},
timestamp: timestamp
]
])
end
defp put_metrics([]), do: :ok
defp put_metrics(metrics), do: GenServer.cast(__MODULE__, {:put_metrics, metrics})
defp do_send_metrics([]), do: :ok
defp do_send_metrics(metrics) do
for batch <- metrics |> Enum.reverse() |> Enum.chunk_every(20) do
request = Cloudwatch.put_metric_data(batch, @namespace)
case ExAws.request(request) do
{:ok, _result} ->
:ok
{:error, error} = reply ->
Logger.warn(fn -> "Failed to put CloudWatch metric data due to: " <> inspect(error) end)
reply
end
end
end
# Send metrics to CloudWatch with at least a one minute delay.
defp enqueue_send_metrics_timer do
Process.send_after(self(), :send_metrics, :timer.minutes(1))
end
defp module_name(struct) do
struct.__struct__ |> Module.split() |> Enum.at(-1)
end
end
defmodule Environment do
def init do
case hostname() do
hostname when is_binary(hostname) -> :ok
nil -> Application.put_env(:my_app, :hostname, inet_hostname())
end
end
def hostname do
Application.get_env(:my_app, :hostname)
end
defp inet_hostname do
{:ok, hostname} = :inet.gethostname()
to_string(hostname)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment