Skip to content

Instantly share code, notes, and snippets.

@efrecon
Last active January 27, 2021 12:55
Show Gist options
  • Save efrecon/d17fc6176830861d6118ce718d831b1f to your computer and use it in GitHub Desktop.
Save efrecon/d17fc6176830861d6118ce718d831b1f to your computer and use it in GitHub Desktop.

Disque to Influx Pusher

The aim of this single-file project is to automatically influx updates from Disque jobs picked from a cluster and queue. The implementation is meant to be given as part of the configuration of the generic disque2any bridge. The assumption made by this project is that jobs are composed of JSON objects directly serialised to SenML or where one of the fields contains SenML. When SenML data is a field in the JSON object, other fields can be pushed as tags when pushing to the Influx database. For proper operation, this project depends on the json package from tcllib. When the SenML library is available, this project will parse all SenML JSON arrays using the library, thus properly recognising base fields and resolving them as part of the JSON arrays. When the library cannot be accessed, the project supposes that all SenML packs are already resolved (this is the old behaviour from prior implementations)

Configuration Options

The project can be configured using a number of options. When interfacing with disque2any, the best solution consists in creating environment variables that are named after the name of the options. For a given dash-led option, remove the leading dash, change it to upper case and insert INFLUX_ at the beginning, where INFLUX is the uppercased basename of the implementation.

Longer options such as password or tags can have a value that will start with a @-sign. In that case, the remaining characters will form the path to a file containing the true value of the option. This can be useful for secret values or longer values.

The dash-led options accepted are as follows:

  • -host Hostname of the influx server to post updates to.
  • -port Port number of the influx server to post updates to.
  • -username Username at influx server
  • -password Password at the influx server. The content of this option can be redirected from a file using a leading @-sign as described above.
  • -db Name of influx database to post to
  • -protocol Protocol to use when posting, defaults to http.
  • -root Root of the API, can be used if your influx server is behind a reverse proxy, defaults to an empty string.
  • -field The name of the measurement will automatically come from the SenML name, this option contains the name of field that will contain the value from the SenML update.
  • -mapping An even long list that will alternatiively map fields picked from the JSON object into tags for the Influx update. Undefined behaviour will happen if the list is non-empt and -senml is empty. The content of this option can be redirected from a file using a leading @-sign as described above.
  • -unit Name of the influx tag that will contain the SenML unit, if present.
  • -senml Name of the JSON field that contains JSON-serialised SenML. When empty, the entire content of the job being posted to Disque is considered to be a SenML array.
  • -tags An even long list that will alternatiively add fields and their value to each influx update. The content of this option can be redirected from a file using a leading @-sign as described above.
  • -truncate Max number of characters from incoming data to print out on errors.
  • -redirect Character to detect redirections of values to filepaths. This is @ by default and changing is at your own risks.
  • -ignore List of patterns to apply on the resulting influx update line. When the line matches, the update should be ignored.
  • -typeguess Guess the data type of the value of v in SenML packs. This should be a boolean that is on by default.

Interfacing with disque2any

Procedure Call

You should arrange for the procedure called ingest to be the one that is bound to the Disque queue in the disque2any configuration. See details in the disque2any manual.

Slave Options

As disque2any uses sandboxed interpreters, you will have to do some extra configuration work through the options that can be specified as part of the routing specifications. Interesting options to look for/at are:

  • -allow to open the firewall and make sure that your slave interpreter is able to access the influx server.
  • -access to make sure the slave interpreter is able to read the configuration files.
  • -path to modify the package access path.
  • -package to help the slave obtaining the json and http package, even though this is made explicit at the beginning of the implementation.
# Make this explicit, even if it should have been done from the outside. This
# allows the script to run manually if necessary.
package require http
package require json
# Dash-led options are things that can be set from the outside, e.g. setting the
# environment variable INFLUX_HOST will set the option -host (uppercase
# conversion, removal of leading dash, addition of main name of implementation
# in uppercase, _ as separator).
array set options {
-host "localhost"
-port "8086"
-username ""
-password ""
-db ""
-protocol "http"
-root ""
-field "value"
-mapping {}
-unit "unit"
-senml ""
-tags {}
-truncate -1
-redirect "@"
-ignore {*null*}
-typeguess on
obfuscate {"-password"}
id 0
senMLlog WARN
}
# Require optional SenML package.
set options(manual) [catch {package require senSML}]
if { $options(manual) } {
debug "Running with restricted senML parser" NOTICE
} else {
debug "Running with RFC8428-compliant senML parser" INFO
}
# Set options from the environment variables (e.g. content of environment
# variable INFLUX_DESTINATION would change the option -destination).
foreach k [array names options -*] {
set envvar [string toupper [file rootname [file tail [info script]]]]_[string toupper [string trimleft $k -]]
if { [info exists ::env($envvar)] } {
if { $k in $options(obfuscate) } {
debug "Setting option $k to [string repeat "*" [string length [set ::env($envvar)]]] (via environment)" NOTICE
} else {
debug "Setting option $k to [set ::env($envvar)] (via environment)" NOTICE
}
set options($k) [set ::env($envvar)]
}
if { $k in $options(obfuscate) } {
debug "Option $k is [string repeat "*" [string length $options($k)]]" INFO
} else {
debug "Option $k is $options($k)" INFO
}
}
# Read complex and/or secret options from files whenever appropriate.
foreach opt [list password tags mapping ignore] {
set opt [string trimleft $opt -]
if { [string index $::options(-$opt) 0] eq $::options(-redirect) } {
set fpath [string trim [string range $::options(-$opt) 1 end]]
debug "Reading content of $opt from $fpath" NOTICE
set fd [open $fpath]
set ::options(-$opt) [string trim [read $fd]]
close $fd
}
}
# escape -- Influx line format escaping
#
# Perform character escaping according to the Influx line manual on an
# incoming sring and depending on its type: measurement, tag, field, value.
# See: https://docs.influxdata.com/influxdb/v1.6/write_protocols/line_protocol_tutorial/#special-characters-and-keywords
#
# Arguments:
# value String to escape
# type Type of string: measurement, tag, field or value
#
# Results:
# A string where all special characters for that type have been escaped.
#
# Side Effects:
# None.
proc escape { value type } {
set escapes [list]
switch -glob $type {
"m*" {
set escapes [list "," " "]
}
"t*" -
"f*" {
set escapes [list "," "=" " "]
}
"v*" {
set escapes [list "\""]
}
}
set map [list]
foreach esc $escapes {
lappend map $esc "\\$esc"
}
return [string map $map $value]
}
# ingest -- Data ingestion
#
# Receives JSON objects into our internal format from a job queue and
# convert it into calls to the HTTP Influx API. The implementation uses a
# large number of options out of the ::options global array to decide the
# names of the destination tags, etc.
#
# Arguments:
# queue Name of the Disque queue where the job was received.
# jobid Identifier of the job
# json Data for the job.
#
# Results:
# 1 when data could be pushed to the database or when data was not proper,
# 0 otherwise. This arranges for jobs to stay in the queue if connection to
# the database was lost, but to disappear if data is not valid for any
# reason (because it will never be valid, whatever happens!).
#
# Side Effects:
# None.
proc ingest { queue jobid json } {
# Parse incoming JSON and fail early on errors.
if { [catch {::json::json2dict $json} d] } {
debug "Cannot parse incoming JSON:\n$json" WARN
return 1; # We ack the job anyway since we will never be able to parse the data!
}
# Do some sort of crude validation, verifying at least that we know who this
# is for and that it looks like our internal JSON format.
set fields [list]
foreach {f t} $::options(-mapping) {
lappend fields $f
}
if { $::options(-senml) ne "" } {
lappend fields $::options(-senml)
}
foreach k $fields {
if { ![dict exists $d $k] } {
debug "Cannot find key $k in JSON: $json" WARN
return 1; # We ack the job anyway since we will never be able to parse the data!
}
}
# Extract tags from the job description that should be mapped onto each
# measurement.
set tags [dict create]
foreach {f t} $::options(-mapping) {
if { $t ne "" } {
dict set tags $t [dict get $d $f]
}
}
# Convert list of dictionaries in senML format to a list of Influx API
# compatible updates
if { $::options(-senml) eq "" } {
set senml $d
} else {
set senml [dict get $d $::options(-senml)]
}
if { $::options(manual) } {
set influx [senML2influx_manual $senml $tags]
} else {
set influx [senML2influx_rfc $senml $tags]
}
if { [llength $influx] } {
# Attempt pushing to the Influx server. On failure, we will return a 0.
# When 0 is returned, the job will remain and will be retried a number
# of times, so we have some guarantee of delivery.
debug "Pushing following data to Influx server:\n[join $influx "\n"]" DEBUG
if { [push $influx] eq "" } {
return 0
}
}
return 1
}
# push -- Push lines to InfluxDB
#
# Push the list of lines passed as a parameter to the remote InfluxDB as a
# single post. Each line must be in the InfluxDB line protocol format. When
# timestamps are present, they should be expressed in milliseconds. Once
# the connection to the remote InfluxDB has been established, content
# pushing happens in the background. Possible problems will be reported to
# the log.
#
# Arguments:
# lines List of lines in the line protocol format.
#
# Results:
# Token for the HTTP operation, empty string on errors.
#
# Side Effects:
# Connect to remote API as specified through the options and push data in
# the background
proc push { lines } {
# Build URL respecting query string format described at the
# https://docs.influxdata.com/influxdb/v1.5/tools/api/#write
set url "$::options(-protocol)://$::options(-host):$::options(-port)$::options(-root)/write?db=$::options(-db)&u=$::options(-username)&p=$::options(-password)&precision=ms"
debug "Pushing [llength $lines] update(s) to $url" INFO
# Use internal http package, arrange for TLS support whenever possible.
# This might fail though, since we are in a safe interp.
if { $::options(-protocol) eq "https" } {
package require tls
::http::register https 443 [list ::tls::socket -tls1 1]
}
# Perform operation in asynchronous mode through the use of a
# command callback for reporting upon success (or failure).
set cmd [list ::http::geturl $url \
-query [join $lines "\n"] \
-method POST \
-binary on \
-command [list ::Done $url]]
if { [catch $cmd tok] } {
debug "Failed pushing data to $url: $tok" WARN
return ""
}
return $tok
}
# senML2influx_manual -- Manual SenML parsing
#
# Convert a list of SenML packs represented as Tcl dictionaries to a list
# of lines ready to be pushed to Influx and compatible with the line format
# protocol. Conversion picks directly from the dictionaries and supposes
# resolved packs, i.e. no support for base fields.
#
# Arguments:
# senml List of Tcl dictionaries, one for each SenML pack
#
# Results:
# Return a list of lines in the Influx line protocol format, conversion is
# subject to rules from the options.
#
# Side Effects:
# All lines matching the -ignore option will be ignored.
proc senML2influx_manual { senml tags } {
set influx {}; # Will contain each update as a line in Influx compatible format
foreach upd $senml {
set line [pack2influx $upd $tags]
if { $line ne "" } {
lappend influx $line
}
}
return $influx
}
# pack2influx -- Convert a resolved SenML Pack
#
# Convert a SenML packs represented as a Tcl dictionary to a line ready to
# be pushed to Influx and compatible with the line format protocol.
# Conversion picks directly from the dictionaries and supposes resolved
# packs, i.e. no support for base fields.
#
# Arguments:
# upd Tcl dictionary representing the SenML pack
#
# Results:
# Return a line in the Influx line protocol format, conversion is subject
# to rules from the options. The line might be empty on errors or when it
# should be ignored (see below)
#
# Side Effects:
# All lines matching the -ignore option will be ignored and lead to
# returning an empty line.
proc pack2influx { upd { tags {}} } {
set line ""
# We need at least a name and some value
if { [dict exists $upd "n"] && \
( [dict size [dict filter $upd key "v*"]] || \
[dict exists $upd "s"]) } {
# Construct the line in the line protocol piece by piece while
# respecting the documented escaping rules for the influx line protocol.
# Each line is decomposed in 4 different parts: the name of the
# measurements, a number of tags, the fields being set and, finally, the
# time of the update.
####
# PART 1: Measurement
#
# The name of the measurement comes directly from the name in the SenML
# pack. No questions asked!
append line "[escape [dict get $upd "n"] measurement]"
####
# PART 2: Tags
# First add tags that are passed to us, these really have gone through
# the -mapping option in prior calls.
dict for {f t} $tags {
append line ",[escape $f tag]=[escape $t tag]"
}
# The unit from SenML is pushed as a tag with the name coming from the
# -unit option. When -unit is empty, the unit will not be pushed to
# Influx, even though it could have been present in the SenML pack.
if { [dict exists $upd "u"] && $::options(-unit) ne "" } {
append line ",[escape $::options(-unit) tag]=[escape [dict get $upd u] tag]"
}
# The module can also add any number of tags and their values to each
# every Influx measurement report. This can be used to tag the source,
# etc.
foreach {k v} $::options(-tags) {
append line ",[escape $k tag]=[escape $v tag]"
}
####
# PART 3: Field
#
# The influx protocol is able to associate several fields to a given
# measurement at one time. This implementation only supports one: the
# value from the SenML pack will be set under the name pointed at be the
# -value option. The implementation extracts s or vb or vs or v (in that
# order) from the SenML pack for the value.
append line " "
append line "[escape $::options(-field) field]="
if { [dict exists $upd "s"] } {
append line [dict get $upd "s"]
} else {
if { [dict exists $upd "vb"] } {
if { [dict get $upd "vb"] } {
append line 1
} else {
append line 0
}
} elseif { [dict exists $upd "vs"] } {
append line "\"[escape [dict get $upd vs] value]\""
} elseif { [dict exists $upd "v"] } {
# Guess around the value, anything that isn't a boolean or number
# will be a string. This is really outside the spec...
set val [dict get $upd "v"]
if { $::options(-typeguess) } {
if { [string is double -strict $val] || [string is boolean -strict $val] } {
append line $val
} else {
append line "\"[escape $val value]\""
}
} else {
# No guessing, v is reserved for doubles and numbers
append line "$val"
}
} else {
debug "Incoming SenML pack contains an unsupported value type!" WARN
return ""
}
}
####
# PART 4: Timestamp
#
# Add the timestamp if we have one (in milliseconds), otherwise let
# be, which will let Influx make that decision. Note that this
# should be in UTC, as per the documentation:
# https://docs.influxdata.com/influxdb/v1.6/concepts/glossary/#timestamp
if { [dict exists $upd "t"] } {
set when [expr {int([dict get $upd "t"]*1000)}]
append line " $when"
}
####
# PART 5: Ignore lines
#
# Once a line has been constructed, it can be ignored if it matches any
# of the glob-style patterns passed through the -ignore option. This can
# be used as crude ignore of all lines having null values inside, or for
# skipping sources known to have problems, for example.
foreach ptn $::options(-ignore) {
if { [string match $ptn $line] } {
# do nothing
debug "ignore line $line" DEBUG
return ""
}
}
}
return $line
}
proc logger { lvl msg } {
debug $msg $lvl
}
# senML2influx_rfc -- SenML parsing
#
# Convert a list of SenML packs represented as Tcl dictionaries to a list
# of lines ready to be pushed to Influx and compatible with the line format
# protocol. Conversion happens through the SenML packge, meaning that base
# fields will be properly resolved as parsing processes.
#
# Arguments:
# senml List of Tcl dictionaries, one for each SenML pack
#
# Results:
# Return a list of lines in the Influx line protocol format, conversion is
# subject to rules from the options.
#
# Side Effects:
# All lines matching the -ignore option will be ignored.
proc senML2influx_rfc { senml tags } {
# Create a unique global variable to collect data while parsing senML. We
# could use the same global list, but that wouldn't be future-proof in case
# this code would be interrupted through coroutines or similar.
set l ::influx\#[incr ::options(id)]
upvar \#0 $l influx
set influx [list]
# Create a SenSML context for parsing, arrange to collect in $l via
# senMLcollect procedure. Arrange to pass logging through our own debug
# proc, filtering out to the SenML level from the options (usually around
# WARN, unless problems arise).
set s [senSML \
-callback [list ::senMLcollect $l $tags] \
-log @logger \
-level $::options(senMLlog)]
$s begin
foreach upd $senml {
if { [catch {$s dictpack $upd} err] } {
debug "Could not parse SenML pack $upd: $err"
}
}
$s end
$s delete
# Transit global variable to a local list so we can unset it and cleanup the
# global state (avoid memory leaks in other words!)
set ret [set $l]
unset $l
return $ret
}
# senMLcollect -- Collect influx lines during senML parsing
#
# Convert each SenML pack to the influx line protocol and add non empty
# converted lines to the global list variable passed as a parameter. This
# procedure complies to the format required by the SenML parser.
#
# Arguments:
# l Name of global variable where to collect converted packs
# s Identifier of the SenSML parser
# step One of OPEN, CLOSE or PACK. We only process on PACK.
# pack Dictionary representing the resolved SenML pack.
#
# Results:
# None.
#
# Side Effects:
# Empty lines will not be collected.
proc senMLcollect { l tags s step { pack {} } } {
upvar \#0 $l L
if { $step eq "PACK" } {
set line [pack2influx $pack $tags]
if { $line ne "" } {
lappend L $line
}
}
}
# Done -- HTTP cleanup
#
# Cleanup upon HTTP operation termination.
#
# Arguments:
# url URL to Influx server
# tok HTTP token, as of HTTP library
#
# Results:
# None.
#
# Side Effects:
# None.
proc Done { url tok } {
set code [::http::ncode $tok]
if { $code >= 200 && $code < 300 } {
debug "Done with push for $url" DEBUG
} else {
if { $::options(-truncate) > 0 } {
set dta [string range [::http::data $tok] 0 [expr {$::options(-truncate)-1}]]
} else {
set dta [::http::data $tok]
}
debug "Could not push to Influx at $url:\
$code -- err:[::http::error $tok]\
-- status:[::http::status $tok]\
-- data:[string trim $dta]" WARN
}
::http::cleanup $tok
}
Copyright (c) <2018>, <Emmanuel Frécon>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment