Skip to content

Instantly share code, notes, and snippets.

@efrecon
Last active April 26, 2019 21:44
Show Gist options
  • Save efrecon/96c6980988abaa221fd82a33d0e29788 to your computer and use it in GitHub Desktop.
Save efrecon/96c6980988abaa221fd82a33d0e29788 to your computer and use it in GitHub Desktop.
Automatically push jobs into Disque from MQTT JSON data

MQTT to Disque Pusher

The aim of this single-file project is to automatically create Disque jobs from data that is being pushed to an MQTT broker. The implementation is meant to be given as part of the configuration of the generic mqtt2any bridge. The assumption made by this project is that you use your MQTT topic hierarchy in a REST-routing like manner: the project will be able to extract the value of some fields between the slashes / of the MQTT topic hiearchy and to copy their values into the job data that is created at the remote Disque server. For proper operation, this project depends on disque, the Tcl implementation of the Disque API. The project is JSON-opinionated (-validate option turned on by default, see below) but can also be used for any other type of data.

Configuration and Templating

File Configuration

In its easiest form, the implementation takes a file describing MQTT topic matching and data templating out of the various items extracted out of the topic path together with the incoming data, i.e. together with the data that was sent to the topic. Topics are delimited by slashes / and the file is able to specify how to work on the various elements between the slashes. The format is so that lines starting with a hash mark # are comments and will be ignored and so will empty lines. Otherwise should a dash-led key followed by the value be contained on the lines. Whenever the value starts with a @-character, its content will be got from the path consistuted of the remaining characters of the value. This is particularily useful for complex values that span multiple lines. Relative paths are relative to the directory that contained this file. The recognised keys are the following.

  • -marker is a regular expression describing a marker that should be looked for in the path. This marker will have an implicit order number of 0 and following slash separated items will have order number 1, 2, 3, etc. Whenever no marker is present, the first item is ranked as 0. When the marker cannot be found, no matching or templating operation will be performed.

  • -match should contain an even long list alternating a rank number starting from the marker and a regular expression. All these expressions should match the items in the topic path at this rank number.

  • -template is a string that will be used to form the content of what is being pushed to the Disque queue. In that string, any occurrence of an integer surrounded by % signs, e.g. %1% will be replaced by the value of that item in the topic, starting from the marker. Any occurrence of %data% will be replaced by the data that was sent to the MQTT topic.

An almost real-life example is provided below:

# Force to look for the first possible appearance of myapi in the topic list,
# this is typically the first item in the slashed topic. Topics that do not
# contain my api will be completely ignored.
-marker   myapi

# Starting from the marker, arrange to force the presence of a version number,
# followed by the keywords device and data.
-match    {1 {1.\d+(.\d)?} 2 device 3 data}

# Some other fields that we wish to collect from the topic will be picked up and
# used in the forward.tpl file
-template @forward.tpl

The content of the forward.tpl could look like the following. In this example, the fields #4 and #5 in the MQTT topic hierarchy contain a customer and location identifiers, these are passed further to the JSON expression that is being constructed as part of the template. It would have been possible to force some formatting check as part of the -match list in the file examplified above. Note the use of %data% which copies the data that arrived at the MQTT topic verbatim.

{
    "customer": "%4%",
    "location": "%5%",
    "data": %data%
}

Configuration Options

The project can be configured using a number of options. When interfacing with mqtt2any, the best solution consists in creating environment variables that are named after the name of the options. For a given dash-led option, remove the leading dash, change it to upper case and insert DISQUE_ at the beginning, where DISQUE is the uppercased basename of the implementation.

Longer options such as password or queue can have a value that will start with a @-sign. In that case, the remaining characters will form the path to a file containing the true value of the option. This can be useful for secret values or longer values.

The dash-led options accepted are as follows:

  • -host Hostname of the Disque server to post templated data to.
  • -port Port number at the Disque server to post templated data to.
  • -password Password at the Disque server. The content of this option can be redirected from a file using a leading @-sign as described above.
  • -queues List of queues at the Disque server to send data to. The content of this option can be redirected from a file using a leading @-sign as described above.
  • -ttl TTL for jobs
  • -retry Max number of seconds for retries.
  • -async Boolean telling if we should post jobs in asynchronous manner to the Disque cluster. This trusts that the cluster is not dismantled at the time of job posting to guarantee persistence when set to true (the default).
  • -replicate Number of job replicas, negative for Disque default.
  • -comment Character to lead comments in files, this is # by default and changing is at your own risks.
  • -redirect Character to detect redirections of values to filepaths. This is @ by default and changing is at your own risks.
  • -validate Boolean requiring the data to be received and used as part of the template in %data% to be valid JSON. This requires to be able to load the tcllib json package, the package is lazy-loaded, meaning that it will not be loaded when the boolean is false.

Interfacing with mqtt2any

Procedure Call

You should arrange for the procedure called forward to be the one that is bound to the MQTT topic subscription in the mqtt2any configuration.

Easiest is to pass the full path to the configuration file (see above) as an argument to the procedure. Further details are provided in the mqtt2any manual, but this involves the use of the ! special character for separating the path from the name of the procedure.

Another solution is to pass the different options described in the file configuration section above, i.e. -marker, -match, -template together with their values as arguments to the procedure called forward. However, this usually rapidly turns to lines too long to visually parse, provided the large number of ! separators to provide.

Slave Options

As mqtt2any uses sandboxed interpreters, you will have to do some extra configuration work. Through the options that can be specified as part of the routing specifications. Interesting options to look for/at are:

  • -allow to open the firewall and make sure that your slave interpreter is able to access the disque server/cluster.
  • -access to make sure the slave interpreter is able to read the configuration and/or templating files.
  • -path to modify the package access path.
  • -package to help the slave obtaining the disque package, even though this is made explicit at the beginning of the implementation.
############
# Module Name - disque.tcl
# Author - Emmanuel Frecon <efrecon@gmail.com>
#
#
# This module will be in charge of receiving data in properly formatted
# SenML/JSON data and to create as many jobs as there are Disque queues
# specified with an almost identical copy of the data. This module is generic
# and the format of the Job posted to the queues is the result of templating the
# data together with some of the information from the MQTT topic itself.
# Detailed information about the extraction and templating process can be found
# at the beginning of the forward.cfg file.
#
# This module can be configured through a number of dash-led optiont that appear
# at the beginning. These options can be set through setting environment
# variables starting with the main name of the module in uppercase, followed by
# an underscore, followed by the name of the option without the leading dash.
# Longer options such as password or queue can have a value that will start with
# a @-sign. In that case, the remaining characters will form the path to a file
# containing the true value of the option. This can be useful for secret values
# or longer values. These options are as follows:
#
# -host Hostname of the Disque server to post templated data to.
#
# -port Port number at the Disque server to post templated data to.
#
# -password Password at the Disque server. The content of this option can
# be redirected from a file using a leading @-sign as described
# above.
#
# -queues List of queues at the Disque server to send data to. The
# content of this option can be redirected from a file using a
# leading @-sign as described above.
#
# -ttl TTL for jobs
#
# -retry Max number of seconds for retries.
#
# -async Boolean telling if we should post jobs in asynchronous manner
# to the Disque cluster. This trusts that the cluster is not
# dismantled at the time of job posting to guarantee persistence
# when set to true (the default).
#
# -replicate Number of job replicas, negative for Disque default.
#
# -comment Character to lead comments in files.
#
# -redirect Character to detect redirections of values to filepaths.
#
############
# Make this explicit, even if it should have been done from the outside. This
# allows the script to run manually if necessary.
package require disque
array set options {
-host "localhost"
-port ""
-password ""
-queues {}
-ttl 86400
-retry 300
-async on
-replicate -1
-comment "\#"
-redirect "@"
-validate on
-attempts 3
-respit 1000
obfuscate {"-password"}
}
# Connection to disque
set ::disque ""
set ::cache [dict create]
set ::queue [list]
# Set options from the environment variables (e.g. content of environment
# variable INFLUX_DESTINATION would change the option -destination).
foreach k [array names options -*] {
set envvar [string toupper [file rootname [file tail [info script]]]]_[string toupper [string trimleft $k -]]
if { [info exists ::env($envvar)] } {
if { $k in $options(obfuscate) } {
debug "Setting option $k to [string repeat "*" [string length [set ::env($envvar)]]] (via environment)" NOTICE
} else {
debug "Setting option $k to [set ::env($envvar)] (via environment)" NOTICE
}
set options($k) [set ::env($envvar)]
}
if { $k in $options(obfuscate) } {
debug "Option $k is [string repeat "*" [string length $options($k)]]" INFO
} else {
debug "Option $k is $options($k)" INFO
}
}
# Read complex and/or secret options from files whenever appropriate.
foreach opt [list password queues] {
set opt [string trimleft $opt -]
if { [string index $::options(-$opt) 0] eq $::options(-redirect) } {
set fpath [string trim [string range $::options(-$opt) 1 end]]
debug "Reading content of $opt from $fpath" NOTICE
set fd [open $fpath]
set ::options(-$opt) [string trim [read $fd]]
close $fd
}
}
# Validation on request only.
if { $::options(-validate) } {
package require json
}
proc Advance { topic args } {
set tlist [split $topic "/"]
# Look for marker and abort if not found.
if { [dict exists $args -marker] } {
set idx [lsearch -regexp $tlist [dict get $args -marker]]
} else {
set idx 0
}
if { $idx < 0 } {
debug "Cannot find marker [dict get $args -marker] in topic $topic" WARN
return [list]
}
# Advance to marker in topic, so that the marker becomes #0
return [lrange $tlist $idx end]
}
proc Match { tlist args } {
# Match against all matching rules and abort as soon as one of the rules
# does not match.
if { [dict exists $args -match] } {
foreach {idx rx} [dict get $args -match] {
if { ![string is integer -strict $idx] } {
debug "Matching index $idx is not an integer!" WARN
return 0
}
if { ! [regexp $rx [lindex $tlist $idx]] } {
return 0
}
}
}
return 1
}
proc Template { tlist data args } {
if { [dict exists $args -template] } {
set mapper [list "%data%" $data]
for {set i 0} {$i<[llength $tlist]} {incr i} {
lappend mapper %$i% [lindex $tlist $i]
}
return [string map $mapper [dict get $args -template]]
} else {
return $data
}
}
proc Connect { { force 0 } } {
if { $force } {
if { $::disque ne "" } {
if { [catch {$::disque close} err] } {
debug "Could not properly close Disque connection: $err" ERROR
}
set ::disque ""
}
}
if { $::disque eq "" } {
set node ""
if { $::options(-password) ne "" } { append node "$::options(-password)@" }
append node $::options(-host)
if { $::options(-port) ne "" } { append node ":$::options(-port)" }
if { [catch {disque -nodes $node} d] } {
debug "Cannot connect to Disque server at $node: $d" ERROR
} else {
debug "Opened connection to Disque server at $node" NOTICE
set ::disque $d
}
}
return $::disque
}
proc Redirect { dir args } {
set rargs [list]
foreach {k v} $args {
if { [string index $v 0] eq $::options(-redirect) } {
set fpath [string trim [string range $v 1 end]]
if { $dir ne "" && [file pathtype $fpath] ne "absolute" } {
set fpath [file join $dir $fpath]
}
debug "Reading content of $k from $fpath" NOTICE
set fd [open $fpath]
set v [string trim [read $fd]]
close $fd
}
lappend rargs $k $v
}
return $rargs
}
proc Dequeue {} {
set poll idle
if { [llength $::queue] } {
lassign $::queue attempts cmd
set q [lindex $cmd end-1]
if { [catch [linsert $cmd 0 [Connect]] jid] == 0 } {
debug "Created job $jid on queue $q" DEBUG
set ::queue [lrange $::queue 2 end]
} else {
debug "Failed creating job, reconnecting... Failure reason: $jid" NOTICE
set poll $::options(-respit)
Connect 1
incr attempts -1
if { $attempts > 0 } {
set ::queue [linsert [lrange $::queue 2 end] 0 $attempts $cmd]
} else {
debug "Cannot create job, too many attempts! Data LOST!!" ERROR
}
}
}
if { [llength $::queue] } {
after $poll Dequeue
}
}
proc forward { topic data args } {
# Nothing to do on empty data...
if { [string trim $data] eq "" } {
return
}
if { $::options(-validate) } {
# Parse the JSON, this will only work on well-formed JSON though. Validate
# the resulting dictionary so we can continue working below.
if { [catch {::json::json2dict $data} d] } {
debug "Cannot parse incoming JSON. Data LOST! $d:\n$data" ERROR
return
}
}
# Arguments are either a single file name (to read the arguments from) or a
# series of dash-led options and values.
if { [llength $args] == 1 } {
set fname [lindex $args 0]
set args [list]
if { ![dict exists $::cache $fname] } {
set fd [open $fname]
while {![eof $fd]} {
set line [string trim [gets $fd]]
if { $line ne "" && [string index $line 0] ne $::options(-comment) } {
lassign $line k v
lappend args -[string trimleft $k -] $v
debug "Remembering -[string trimleft $k -] to $v via $fname" DEBUG
}
}
close $fd
# Cache for next time
set args [Redirect [file dirname $fname] {*}$args]
dict set ::cache $fname $args
} else {
set args [dict get $::cache $fname]
}
} else {
set args [Redirect "" {*}$args]
}
set tlist [Advance $topic {*}$args]
if { [llength $tlist] } {
if { [Match $tlist {*}$args] } {
set json [Template $tlist $data {*}$args]
debug "Templated incoming data to:\n$json" TRACE
# Create new Disque jobs on the queues
foreach q $::options(-queues) {
# Construct ADDJOB command using the global options. For some of these
# options, let Disque decide upon the best, e.g. Disque will replicate
# to at most 3 nodes of the cluster whenever nothing is specified.
set cmd [list addjob \
-ttl $::options(-ttl) \
-retry $::options(-retry)]
# Add replicate specification only if especially requested to.
if { $::options(-replicate) ne "" \
&& [string is integer -strict $::options(-replicate)] \
&& $::options(-replicate) > 0 } {
lappend cmd -replicate $::options(-replicate)
}
# Do this asynchronously or not. If not, we will have to wait for the
# job to be replicated to all nodes
if { $::options(-async) } {
lappend cmd -async
}
lappend cmd $q $json
if { [llength $::queue] } {
lappend ::queue $::options(-attempts) $cmd
} else {
lappend ::queue $::options(-attempts) $cmd
after idle Dequeue
}
}
}
}
}
Copyright (c) <2018>, <Emmanuel Frécon>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment