Skip to content

Instantly share code, notes, and snippets.

@mbuhot
Created March 26, 2018 22:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mbuhot/93eab4472901eb18294d4eff1456aff6 to your computer and use it in GitHub Desktop.
Save mbuhot/93eab4472901eb18294d4eff1456aff6 to your computer and use it in GitHub Desktop.
StatsD middleware for Exq
defmodule ExqStatsD do
@behaviour Exq.Middleware.Behaviour
alias Exq.Middleware.Pipeline
@moduledoc """
This behavior will send stats to statsd endpoint for monitoring.
"""
require Logger
def before_work(pipeline = %Pipeline{assigns: %{job: job, worker_module: worker_module}}) do
module = Macro.to_string(worker_module)
queue = job.queue
retry_count = job.retry_count || 0
ExStatsD.increment("jobs.#{module}.count")
ExStatsD.increment("jobs.#{queue}.count")
ExStatsD.increment("jobs.count")
if retry_count > 0 do
ExStatsD.increment("jobs.#{module}.retry")
ExStatsD.increment("jobs.#{queue}.retry")
end
with {:ok, size} <- Exq.Api.queue_size(Exq.Api, queue) do
ExStatsD.gauge(size, "jobs.#{queue}.queue_size")
end
pipeline
end
def after_processed_work(pipeline = %Pipeline{assigns: assigns}) do
job = assigns.job
module = Macro.to_string(assigns.worker_module)
duration = assigns.duration
ExStatsD.increment("jobs.#{module}.success")
ExStatsD.increment("jobs.#{job.queue}.success")
ExStatsD.increment("jobs.success")
ExStatsD.timer(duration, "jobs.#{module}.perform")
pipeline
end
def after_failed_work(pipeline = %Pipeline{assigns: %{job: job, worker_module: worker_module}}) do
module = Macro.to_string(worker_module)
ExStatsD.increment("jobs.#{job.queue}.failure")
ExStatsD.increment("jobs.#{module}.failure")
ExStatsD.increment("jobs.failure")
pipeline
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment