Skip to content

Instantly share code, notes, and snippets.

@justx1
Last active December 24, 2015 11:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justx1/6790192 to your computer and use it in GitHub Desktop.
Save justx1/6790192 to your computer and use it in GitHub Desktop.
"Streaming" MQTT Sensor Data into TempoDB
require 'tempodb'
require 'mqtt'
# CloudMQTT connection parameters
# Creates a hash with the connection parameters from the cloudmqtt URL in .env, else from the localhost URL
uri = URI.parse ENV['CLOUDMQTT_URL'] || 'mqtt://localhost:1883'
mqtt_conn_opts = {
remote_host: uri.host,
remote_port: uri.port,
username: uri.user,
password: uri.password,
}
# TempoDB connection parameters from the TempoDB parameters in .env
api_key = ENV['TEMPODB_API_KEY']
api_secret = ENV['TEMPODB_API_SECRET']
api_host = ENV['TEMPODB_API_HOST']
api_port = Integer(ENV['TEMPODB_API_PORT'])
api_secure = ENV['TEMPODB_API_SECURE'] == "False" ? false : true
# Listener that is subscribed to MQTT broker and upon receiving a new message, writes key:value into TempoDB
Thread.new do
client = TempoDB::Client.new( api_key, api_secret, api_host, api_port, api_secure )
MQTT::Client.connect(mqtt_conn_opts) do |c|
# This block will be called when new messages arrive to the topic
c.get('#') do |topic,message|
data = [
TempoDB::DataPoint.new(Time.now.utc, message.to_f)
]
client.write_key(topic, data)
sleep 1
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment