Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Telegraf Socket Listener Input Plugin

Telegraf Socket Listener Input Plugin

One of Telegraf's biggest strengths is the large collection of plugins it offers that can be used to immediately start collecting data from a variety of applications. This covers a lot of common infrastruture components like databases and web servers, but if you want to get data into Telegraf using your own scripts or applications there are also a variety of more general purpose input plugins that you can use. There's a file plugin, which will read from or tail a file on disk, an exec plugin which will execute your own scripts or commands and parse their output, and plugins to listen for data via HTTP or poll external endpoints (http_listener_v2 and http).

One of the plugins I end up using most often is the socket_listener plugin, which allows you to send data to Telegraf using UDP, TCP, or Unix sockets. The simplicity of InfluxDB's line protocol and the ease with which you can write to a socket using most programming languages makes this a powerful tool for both quick prototyping and more long-term solutions.

Let's take a look at some examples of the plugin in action. We'll use a host system running Ubuntu, with Telegraf and InfluxDB installed from the official InfluxData repository.

UDP: User Datagram Protocol

For this first example, let's configure the socket_listener plugin to accept UDP packets. The UDP protocol is a connectionless, best effort service: messages are sent to their recipient with no guarantee that they will be recieved. One benefit of this is that it means there is less processing overhead for UDP relative to something like TCP, which is more reliable. That makes UDP well suited for things like interprocess communication or situations in which some loss of data is acceptible.

In order to set up Telegraf to receive UDP messages, first configure this block in your telegraf.conf:

[[inputs.socket_listener]]
  service_address = "udp://:8094"
  data_format = "influx"

This configures Telegraf to listen for data on UDP port 8094 in InfluxDB line protocol format. A quick restart of Telegraf and it should be ready to receive data:

sudo systemctl restart telegraf

Now we can start writing some data to Telegraf via UDP. To send the data, we'll use two common command line utilities, echo and nc, or netcat. Enter the following commands into your terminal:

$ echo "m1,tag1=tag_value value=1" | nc -u -4 -q localhost 8094
$ echo "m1,tag1=tag_value value=2" | nc -u -4 -q localhost 8094
$ echo "m1,tag1=tag_value value=3" | nc -u -4 -q localhost 8094

The InfluxDB Line Protocol is a plain text format, which means we can use the echo command to print messages in that format directly to stdout, and use the | character to "pipe" that text into nc, which will transmit the data over a network connection (in our case, on the same machine via the loopback interface). We provide several arguments to nc: -u, -4, and -q. The first argument tells nc to send data using UDP; the second argument says that we should use IPv4, and the third argument tells nc to quit once the data has been sent.

Because UDP doesn't have any guarantees about whether the messages will be delivered, let's verify that the data has actually arrived in the database using the Influx CLI:

$ influx
Connected to http://localhost:8086 version 1.7.4
InfluxDB shell version: unknown
Enter an InfluxQL query
> use database telegraf
> SELECT * FROM mymeasurement WHERE time > now()-5m
name: socketwriter
time                host           tag1       value
----                ----           ----       --------
1562602580414360000 noah-mbp.local tag_value  1
1562602581418350000 noah-mbp.local tag_value  2
1562602582420300000 noah-mbp.local tag_value  3

Success! Our three datapoints have been written to the database.

Unix Socket

While UDP and TCP are network protocols, the socket_listener plugin also has the ability to listen on a Unix socket, which provides an method for exchanging data between two processes on the same host via the kernel. Even when communicating on the same host, IP sockets like TCP and UDP will send data through the loopback networking interface. A Unix socket, on the other hand, can avoid this extra work and send data directly to the receiving socket buffer, which gives it a performance edge when sending data locally. You can read more about the differences between unix sockets and network sockets in this mailing list post titled unix domain sockets vs. internet sockets.

This time, instead of using command line tools, we'll write a small Python script that will send one dimensional random walk data over a Unix socket to Telegraf. First, we'll configure Telegraf as follows (full config here):

[[inputs.socket_listener]]
  service_address = "unixgram:///tmp/telegraf.sock"

Then we'll run Telegraf with the new config:

$ telegraf --config socket-telegraf.conf --debug

We can see that Telegraf has created the socket by listing the contents of the /tmp directory:

$ ls /tmp
telegraf.sock

Next we'll create our Python script. The full script can be found here. Following the import statements and an interrupt handler which will close the socket when we use Ctrl+C to exit our script, we'll create a socket object and connect to the socket Telegraf has opened.

# The file handler for the Telegraf process.
telegraf_socket = "/tmp/telegraf.sock"

# Connection to Telegraf, over a network socket.
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)

sock.connect(telegraf_socket)

And then we create a loop that first sends the current data point to InfluxDB before updating the position variable for the next run through the loop, and sleeps for a second in order to rate limit the number of points we send:

while True:
    message = "socketwriter position=%d\n" % x
    print(message)
    sock.send(message.encode('utf8'))
    x = x + random.randint(-1, 1)
    time.sleep(1)

With Telegraf running, let's start our script:

$ python3 unix_socket_writer.py 
socketwriter position=0

socketwriter position=0

socketwriter position=1

socketwriter position=0

Once again, we can open up the InfluxDB CLI and verfiy that our data has been written:

$ influx
Connected to http://localhost:8086 version 1.7.4
InfluxDB shell version: unknown
Enter an InfluxQL query
> use database telegraf
> SELECT * FROM socketwriter WHERE time > now()-1m
name: socketwriter
time                host           position
----                ----           --------
1562602580414360000 noah-mbp.local 0
1562602581418350000 noah-mbp.local 0
1562602582420300000 noah-mbp.local 1
1562602583422080000 noah-mbp.local 0

Looks good!

Use it in your own projects!

Whether you're using Unix sockets for their local performance or TCP sockets for their network robustness, Telegraf's socket_listener plugin provides one of the easiest ways to get data from your scripts and applications and into Telegraf. And if that doesn't work for you, check out some of the other general purpose plugins that Telegraf has to offer—you're sure to find something that suit your needs.

If you have questions or need help with getting the socket_listener plugin set up, check out some of our community resources, like our public Slack or Discourse forums, or reach out to me directly on Twitter @noahcrowley.

# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply prepend
# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"),
# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "2s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## For failed writes, telegraf will cache metric_buffer_limit metrics for each
## output, and will flush this buffer on a successful write. Oldest metrics
## are dropped first when this buffer fills.
## This buffer only fills when writes fail to output plugin(s).
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s.
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
precision = ""
## Logging configuration:
## Run telegraf with debug log messages.
debug = false
## Run telegraf in quiet mode (error log messages only).
quiet = false
## Specify the log file name. The empty string means to log to stderr.
logfile = ""
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
# Configuration for sending metrics to InfluxDB
[[outputs.influxdb]]
## The full HTTP or UDP URL for your InfluxDB instance.
##
## Multiple URLs can be specified for a single cluster, only ONE of the
## urls will be written to each interval.
# urls = ["unix:///var/run/influxdb.sock"]
# urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
## The target database for metrics; will be created as needed.
## For UDP url endpoint database needs to be configured on server side.
# database = "telegraf"
## The value of this tag will be used to determine the database. If this
## tag is not set the 'database' option is used as the default.
# database_tag = ""
## If true, no CREATE DATABASE queries will be sent. Set to true when using
## Telegraf with a user without permissions to create databases or when the
## database already exists.
# skip_database_creation = false
## Name of existing retention policy to write to. Empty string writes to
## the default retention policy. Only takes effect when using HTTP.
# retention_policy = ""
## Write consistency (clusters only), can be: "any", "one", "quorum", "all".
## Only takes effect when using HTTP.
# write_consistency = "any"
## Timeout for HTTP messages.
# timeout = "5s"
## HTTP Basic Auth
# username = "telegraf"
# password = "metricsmetricsmetricsmetrics"
## HTTP User-Agent
# user_agent = "telegraf"
## UDP payload size is the maximum packet size to send.
# udp_payload = "512B"
## Optional TLS Config for use on HTTP connections.
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## HTTP Proxy override, if unset values the standard proxy environment
## variables are consulted to determine which proxy, if any, should be used.
# http_proxy = "http://corporate.proxy:3128"
## Additional HTTP headers
# http_headers = {"X-Special-Header" = "Special-Value"}
## HTTP Content-Encoding for write request body, can be set to "gzip" to
## compress body or "identity" to apply no encoding.
# content_encoding = "identity"
## When true, Telegraf will output unsigned integers as unsigned values,
## i.e.: "42u". You will need a version of InfluxDB supporting unsigned
## integer values. Enabling this option will result in field type errors if
## existing data has been written.
# influx_uint_support = false
###############################################################################
# PROCESSOR PLUGINS #
###############################################################################
###############################################################################
# AGGREGATOR PLUGINS #
###############################################################################
###############################################################################
# INPUT PLUGINS #
###############################################################################
###############################################################################
# SERVICE INPUT PLUGINS #
###############################################################################
# Generic socket listener capable of handling multiple socket types.
[[inputs.socket_listener]]
## URL to listen on
# service_address = "tcp://:8094"
# service_address = "tcp://127.0.0.1:http"
# service_address = "tcp4://:8094"
# service_address = "tcp6://:8094"
# service_address = "tcp6://[2001:db8::1]:8094"
# service_address = "udp://:8094"
# service_address = "udp4://:8094"
# service_address = "udp6://:8094"
# service_address = "unix:///tmp/telegraf.sock"
service_address = "unixgram:///tmp/telegraf.sock"
## Maximum number of concurrent connections.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# max_connections = 1024
## Read timeout.
## Only applies to stream sockets (e.g. TCP).
## 0 (default) is unlimited.
# read_timeout = "30s"
## Optional TLS configuration.
## Only applies to stream sockets (e.g. TCP).
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Enables client authentication if set.
# tls_allowed_cacerts = ["/etc/telegraf/clientca.pem"]
## Maximum socket buffer size (in bytes when no unit specified).
## For stream sockets, once the buffer fills up, the sender will start backing up.
## For datagram sockets, once the buffer fills up, metrics will start dropping.
## Defaults to the OS default.
# read_buffer_size = "64KiB"
## Period between keep alive probes.
## Only applies to TCP sockets.
## 0 disables keep alive probes.
## Defaults to the OS configuration.
# keep_alive_period = "5m"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
# data_format = "influx"
#!/usr/bin/env python3
import sys
import signal
import socket
import time
import random
# This code waits for the Unix Signal "SIGINT", which can be sent by typing
# "ctrl+c" on the keyboard. When the program receives that signal, it executes
# the signal_handler function, which closes the network socket and exits the
# program.
def signal_handler(signal, frame):
print('Stopping...')
sock.close()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# The inital value for our measurements
x = 0
# The file handler for the Telegraf process.
telegraf_socket = "/tmp/telegraf.sock"
# Connection to Telegraf, over a network socket.
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.connect(telegraf_socket)
# We start a loop which will run until the program is exited (using "ctrl+c").
# We then create a message in "line-protocol", print it for debugging purposes, and
# send it to Telegraf over the socket connection. Finally, we generate a new value
# using the previous value and a random value, and sleep for the specified time.
# Then we repeat.
while True:
message = "socketwriter position=%d\n" % x
print(message)
sock.send(message.encode('utf8'))
x = x + random.randint(-1, 1)
time.sleep(1)
@kadma1

This comment has been minimized.

Copy link

@kadma1 kadma1 commented Mar 9, 2020

Hi, I made configuration of two telegrafs agent. One colecting metrics form host and send to other telegraf. First telegraf configuration:
[[outputs.http]]
url = "https://telegraftarget.net"
data_format = "influx"

Second telegraf configuration:
[[inputs.socket_listener]]

URL to listen on

service_address = "tcp://:8090"
data_format = "influx"

and i see a lot of error lines.
on first telegraf:
020-03-09T12:43:45Z E! [agent] Error writing to outputs.http: Post https://telegraftarget.net: net/http: request canceled (Client.Timeout exceeded while awaiting headers)

and on second telegraf:

2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:7: "POST / HTTP/1.1"

  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:28: "User-Agent: Telegraf/1.13.4"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:23: "Content-Length: 275140"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:26: "Content-Type: text/plain; charset=utf-8"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:22: "Accept-Encoding: gzip"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:51: "Host: telegraftarget-accprometheus.xaas.pl.ing.net"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:63: "X-Forwarded-Host: telegraftarget-accprometheus.xaas.pl.ing.net"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:22: "X-Forwarded-Port: 443"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:25: "X-Forwarded-Proto: https"
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:22: "Forwarded: for=10.111.0.246;host=telegraftargetl.ing.net;proto=https;proto-version="
  | 2020-03-09T13:05:00Z E! [inputs.socket_listener] Unable to parse incoming line: metric parse error: expected field at 1:30: "X-Forwarded-For: 10.111.0.246"

What and where should be set to cover above errors ? Some headers on sender side ?
Thanks in advance for help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment