Skip to content

Instantly share code, notes, and snippets.

@vigevenoj
Last active March 21, 2016 03:45
Show Gist options
  • Save vigevenoj/795ffbb05a3aeb0e2c60 to your computer and use it in GitHub Desktop.
Save vigevenoj/795ffbb05a3aeb0e2c60 to your computer and use it in GitHub Desktop.
mqtt_influxdb_bridge.rb
#!/opt/rh/ruby193/root/usr/bin/ruby
# -*- encoding: utf-8 -*-
#
# mqtt & influxdb sample...
#
# libraries
# $ gem install mqtt
# $ gem install influxdb
# $ gem install logger
#
require 'rubygems'
require 'mqtt'
require 'json'
require 'logger'
require 'influxdb'
require 'yaml'
class MqttInfluxBridge
Version = '0.0.1'
def initialize(arguments, stdin)
configuration = YAML::load_file(File.join(File.dirname(__FILE__), 'config.yaml'))
@mqtt = MQTT::Client.new
@mqtt.host = configuration['mqtt']['host']
@mqtt.port = configuration['mqtt']['port']
@mqtt.ssl = configuration['mqtt']['ssl']
@mqtt.ca_file = configuration['mqtt']['ca_cert']
@mqtt.username = configuration['mqtt']['username']
@mqtt.password = configuration['mqtt']['password']
@topic = configuration['mqtt']['topic'] || 'sensors/#'
@influx = InfluxDB::Client.new configuration['influxdb']['database'],
username: configuration['influxdb']['username'],
password: configuration['influxdb']['password']
STDOUT.sync = true
#@log = Logger.new(STDOUT)
@log = Logger.new('mqtt-influx.log', 10, 1048576)
@log.level = Logger::INFO
end
def handle_single_measurement(tags, json)
# validate that the type matches the type in the message as well
# compare tags[2] to json['type']
@log.info("#{json['type']} vs #{tags[2]}")
# if json['type'] == tags[2]
# Then connect to influxdb and write this point
data = {
series: json['type'],
tags: { address: tags[0], location: tags[1] },
values: json
}
@influx.write_point(json['type'], data, 's')
end
def handle_message(topic, message)
@log.info("received mqtt message : topic=#{topic}, message=#{message}")
# Split the topic string into tags.
# We know the first element is "sensors" so we can strip that,
# but we want the rest (address, location, and type, by our convention)
# sensors/address/location / eg, sensors/harold/office
tags = topic.split("/", 2)[1].split("/")
begin
message.gsub!(/'/, "\"") # An older version of the source client used single quotes in the message
json = JSON.parse(message)
if json.is_a?(Hash)
handle_single_measurement(tags, json)
elsif json.is_a?(Array)
json.each { |msg| handle_single_measurement(tags, msg) }
end
rescue JSON::ParserError => e
# invalid json message, bail out and wait for the next message
end
end
def run
begin
@mqtt.connect() do |c|
@log.info("Connection start...")
c.get(@topic) do |topic, message|
handle_message topic, message
end
end # end of connection
rescue Exception => e
@log.error(e)
end
end #end of run block
end #end class def
app = MqttInfluxBridge.new(ARGV, STDIN)
app.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment