Logstatsh dogstatsd output
# Installation:
# - `gem install dogstatsd-ruby`
# - Drop `dogstatsd.rb` into the logstatsh outputs folder
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
# statsd is a network daemon for aggregating statistics, such as counters and timers,
# and shipping over UDP to backend services, such as Graphite or Datadog.
# The most basic coverage of this plugin is that the 'namespace', 'sender', and
# 'metric' names are combined into the full metric path like so:
# namespace.sender.metric
# The general idea is that you send statsd count or latency data and every few
# seconds it will emit the aggregated values to the backend. Example aggregates are
# average, max, stddev, etc.
# You can learn about statsd here:
# * <>
# * <>
# A simple example usage of this is to count HTTP hits by response code; to learn
# more about that, check out the [log metrics tutorial](../tutorials/metrics-from-logs)
# The default final metric sent to statsd would look like this:
# namespace.sender.metric
# With regards to this plugin, the default namespace is "logstash", the default sender
# is the ${host} field, and the metric name depends on what is set as the metric name
# in the increment, decrement, timing, count, set or gauge variable.
class LogStash::Outputs::Dogstatsd < LogStash::Outputs::Base
## Regex stolen from statsd code
config_name "dogstatsd"
milestone 2
# The address of the statsd server.
config :host, :validate => :string, :default => "localhost"
# The port to connect to on your statsd server.
config :port, :validate => :number, :default => 8125
# The statsd namespace to use for this metric.
config :namespace, :validate => :string, :default => "logstash"
# The name of the sender. Dots will be replaced with underscores.
config :sender, :validate => :string, :default => "%{host}"
# An increment metric. Metric names as array.
config :increment, :validate => :array, :default => []
# A decrement metric. Metric names as array.
config :decrement, :validate => :array, :default => []
# A timing metric. `metric_name => duration` as hash
config :timing, :validate => :hash, :default => {}
# A count metric. `metric_name => count` as hash
config :count, :validate => :hash, :default => {}
# A set metric. `metric_name => "string"` to append as hash
config :set, :validate => :hash, :default => {}
# A gauge metric. `metric_name => gauge` as hash.
config :gauge, :validate => :hash, :default => {}
# The sample rate for the metric.
config :sample_rate, :validate => :number, :default => 1
# Enable debugging.
config :debug, :validate => :boolean, :default => false, :deprecated => "This setting was never used by this plugin. It will be removed soon."
def register
@client =, @port)
end # def register
def receive(event)
return unless output?(event)
# Get the tags from the event.
tags = ["instance:some_instance"]
@client.namespace = event.sprintf(@namespace) if not @namespace.empty?
@logger.debug? and @logger.debug("Original sender: #{@sender}")
sender = event.sprintf(@sender)
@logger.debug? and @logger.debug("Munged sender: #{sender}")
@logger.debug? and @logger.debug("Event: #{event}")
@increment.each do |metric|
@client.increment(build_stat(event.sprintf(metric), sender), :sample_rate => @sample_rate, :tags => tags)
@decrement.each do |metric|
@client.decrement(build_stat(event.sprintf(metric), sender), :sample_rate => @sample_rate, :tags => tags)
@count.each do |metric, val|
@client.count(build_stat(event.sprintf(metric), sender),
event.sprintf(val).to_f, :sample_rate => @sample_rate, :tags => tags)
@timing.each do |metric, val|
@client.timing(build_stat(event.sprintf(metric), sender),
event.sprintf(val).to_f, :sample_rate => @sample_rate, :tags => tags)
@set.each do |metric, val|
@client.set(build_stat(event.sprintf(metric), sender),
event.sprintf(val), :sample_rate => @sample_rate, :tags => tags)
@gauge.each do |metric, val|
@client.gauge(build_stat(event.sprintf(metric), sender),
event.sprintf(val).to_f, :sample_rate => @sample_rate, :tags => tags)
end # def receive
def build_stat(metric, sender=@sender)
sender = sender.gsub('::','.').gsub(RESERVED_CHARACTERS_REGEX, '_').gsub(".", "_")
metric = metric.gsub('::','.').gsub(RESERVED_CHARACTERS_REGEX, '_')
@logger.debug? and @logger.debug("Formatted value", :sender => sender, :metric => metric)
return "#{sender}.#{metric}"
end # class LogStash::Outputs::Statsd
# --------------------------------------------------
# Inline Dogstatsd
# Do not edit below this line.
# --------------------------------------------------
require 'socket'
# = Statsd: A DogStatsd client (
# @example Set up a global Statsd client for a server on localhost:8125
# require 'statsd'
# $statsd = 'localhost', 8125
# @example Send some stats
# $statsd.increment 'page.views'
# $statsd.timing 'page.load', 320
# $statsd.gauge '', 100
# @example Use {#time} to time the execution of a block
# $statsd.time('account.activate') { @account.activate! }
# @example Create a namespaced statsd client and increment 'account.activate'
# statsd = 'localhost', 8125, :namespace => 'account'
# statsd.increment 'activate'
# @example Create a statsd client with global tags
# statsd = 'localhost', 8125, :tags => 'tag1:true'
class Dogstatsd
# Create a dictionary to assign a key to every parameter's name, except for tags (treated differently)
# Goal: Simple and fast to add some other parameters
['date_happened', 'd'],
['hostname', 'h'],
['aggregation_key', 'k'],
['priority', 'p'],
['source_type_name', 's'],
['alert_type', 't']
# A namespace to prepend to all statsd calls. Defaults to no namespace.
attr_reader :namespace
# StatsD host. Defaults to
attr_reader :host
# StatsD port. Defaults to 8125.
attr_reader :port
# Global tags to be added to every statsd call. Defaults to no tags.
attr_reader :tags
class << self
# Set to a standard logger instance to enable debug logging.
attr_accessor :logger
# Return the current version of the library.
def self.VERSION
# @param [String] host your statsd host
# @param [Integer] port your statsd port
# @option opts [String] :namespace set a namespace to be prepended to every metric name
# @option opts [Array<String>] :tags tags to be added to every metric
def initialize(host = DEFAULT_HOST, port = DEFAULT_PORT, opts = {}), self.port = host, port
@prefix = nil
@socket =
self.namespace = opts[:namespace]
self.tags = opts[:tags]
def namespace=(namespace) #:nodoc:
@namespace = namespace
@prefix = namespace.nil? ? nil : "#{namespace}."
def host=(host) #:nodoc:
@host = host || ''
def port=(port) #:nodoc:
@port = port || 8125
def tags=(tags) #:nodoc:
@tags = tags || []
# Sends an increment (count = 1) for the given stat to the statsd server.
# @param [String] stat stat name
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @see #count
def increment(stat, opts={})
count stat, 1, opts
# Sends a decrement (count = -1) for the given stat to the statsd server.
# @param [String] stat stat name
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @see #count
def decrement(stat, opts={})
count stat, -1, opts
# Sends an arbitrary count for the given stat to the statsd server.
# @param [String] stat stat name
# @param [Integer] count count
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
def count(stat, count, opts={})
send_stats stat, count, :c, opts
# Sends an arbitary gauge value for the given stat to the statsd server.
# This is useful for recording things like available disk space,
# memory usage, and the like, which have different semantics than
# counters.
# @param [String] stat stat name.
# @param [Numeric] value gauge value.
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @example Report the current user count:
# $statsd.gauge('user.count', User.count)
def gauge(stat, value, opts={})
send_stats stat, value, :g, opts
# Sends a value to be tracked as a histogram to the statsd server.
# @param [String] stat stat name.
# @param [Numeric] value histogram value.
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @example Report the current user count:
# $statsd.histogram('user.count', User.count)
def histogram(stat, value, opts={})
send_stats stat, value, :h, opts
# Sends a timing (in ms) for the given stat to the statsd server. The
# sample_rate determines what percentage of the time this report is sent. The
# statsd server then uses the sample_rate to correctly track the average
# timing for the stat.
# @param [String] stat stat name
# @param [Integer] ms timing in milliseconds
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
def timing(stat, ms, opts={})
send_stats stat, ms, :ms, opts
# Reports execution time of the provided block using {#timing}.
# @param [String] stat stat name
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @yield The operation to be timed
# @see #timing
# @example Report the time (in ms) taken to activate an account
# $statsd.time('account.activate') { @account.activate! }
def time(stat, opts={})
start =
result = yield
timing(stat, (( - start) * 1000).round, opts)
# Sends a value to be tracked as a set to the statsd server.
# @param [String] stat stat name.
# @param [Numeric] value set value.
# @param [Hash] opts the options to create the metric with
# @option opts [Numeric] :sample_rate sample rate, 1 for always
# @option opts [Array<String>] :tags An array of tags
# @example Record a unique visitory by id:
# $statsd.set('visitors.uniques',
def set(stat, value, opts={})
send_stats stat, value, :s, opts
# This end point allows you to post events to the stream. You can tag them, set priority and even aggregate them with other events.
# Aggregation in the stream is made on hostname/event_type/source_type/aggregation_key.
# If there's no event type, for example, then that won't matter;
# it will be grouped with other events that don't have an event type.
# @param [String] title Event title
# @param [String] text Event text. Supports \n
# @param [Hash] opts the additional data about the event
# @option opts [Time, nil] :date_happened (nil) Assign a timestamp to the event. Default is now when none
# @option opts [String, nil] :hostname (nil) Assign a hostname to the event.
# @option opts [String, nil] :aggregation_key (nil) Assign an aggregation key to the event, to group it with some others
# @option opts [String, nil] :priority ('normal') Can be "normal" or "low"
# @option opts [String, nil] :source_type_name (nil) Assign a source type to the event
# @option opts [String, nil] :alert_type ('info') Can be "error", "warning", "info" or "success".
# @option opts [Array<String>, nil] :source_type_name (nil) An array of tags
# @example Report an aweful event:
# $statsd.event('Something terrible happened', 'The end is near if we do nothing', :alert_type=>'warning', :tags=>['end_of_times','urgent'])
def event(title, text, opts={})
event_string = format_event(title, text, opts)
raise "Event #{title} payload is too big (more that 8KB), event discarded" if event_string.length > 8 * 1024
send_to_socket event_string
def format_event(title, text, opts={})
escape_event_content title
escape_event_content text
event_string_data = "_e{#{title.length},#{text.length}}:#{title}|#{text}"
# We construct the string to be sent by adding '|key:value' parts to it when needed
# All pipes ('|') in the metada are removed. Title and Text can keep theirs
OPTS_KEYS.each do |name_key|
if name_key[0] != 'tags' && opts[name_key[0].to_sym]
value = opts[name_key[0].to_sym]
rm_pipes value
event_string_data << "|#{name_key[1]}:#{value}"
tags = opts[:tags] || nil
# Tags are joined and added as last part to the string to be sent
if tags
tags.each do |tag|
rm_pipes tag
tags = "#{tags.join(",")}" unless tags.empty?
event_string_data << "|##{tags}"
raise "Event #{title} payload is too big (more that 8KB), event discarded" if event_string_data.length > 8 * 1024
return event_string_data
def escape_event_content(msg)
msg = msg.sub! "\n", "\\n"
def rm_pipes(msg)
msg = msg.sub! "|", ""
def send_stats(stat, delta, type, opts={})
sample_rate = opts[:sample_rate] || 1
if sample_rate == 1 or rand < sample_rate
# Replace Ruby module scoping with '.' and reserved chars (: | @) with underscores.
stat = stat.to_s.gsub('::', '.').tr(':|@', '_')
rate = "|@#{sample_rate}" unless sample_rate == 1
ts = (tags || []) + (opts[:tags] || [])
tags = "|##{ts.join(",")}" unless ts.empty?
send_to_socket "#{@prefix}#{stat}:#{delta}|#{type}#{rate}#{tags}"
def send_to_socket(message)
self.class.logger.debug { "Statsd: #{message}" } if self.class.logger
@socket.send(message, 0, @host, @port)
rescue => boom
self.class.logger.error { "Statsd: #{boom.class} #{boom}" } if self.class.logger
