Skip to content

Instantly share code, notes, and snippets.

@feymartynov
Created September 28, 2019 19:01
Show Gist options
  • Save feymartynov/ffa9fca72dded0dad3ade3a0386e6772 to your computer and use it in GitHub Desktop.
Save feymartynov/ffa9fca72dded0dad3ade3a0386e6772 to your computer and use it in GitHub Desktop.
Ruby DSL for ULMS interactions with MQTT

Ruby DSL for ULMS interactions with MQTT

The DSL is useful for development, testing and debugging by enabling quickly writing MQTT interaction scenarios.

Advantages over mosquitto:

  1. Less boilerplate. Actually while providing all the options to mosquitto I can easily forget what I actually want to do with it. This is damn slow.
  2. Ruby is much more nice to read and edit than bash.
  3. Request chains are possible. With mosquitto you have to open two terminals and copy-paste ids from one to another manually which is wild and tedious especially when you have to do it many times while debugging.
  4. Single connection for pub/sub. With mosquitto you have two separate connections with different agent labels so it's impossible to deal with unicasts.
  5. Multiple publishes in a single session are possible which is necessary for certain cases.

Usage

Install ruby-mqtt gem:

gem install mqtt

Require the DSL to your script:

require_relative './dsl'

See examples and documentation in dsl.rb for available methods.

require 'json'
require 'logger'
require 'securerandom'
require 'timeout'
require 'mqtt'
LOG = Logger.new(STDOUT)
LOG.level = Logger::INFO;
DEFAULT_TIMEOUT = 5
###############################################################################
class AssertionError < StandardError; end
class Account
attr_reader :label, :audience
def initialize(label, audience)
@label = label
@audience = audience
end
def to_s
"#{@label}.#{@audience}"
end
end
class Agent
attr_reader :label, :account
def initialize(label, account)
@label = label
@account = account
end
def to_s
"#{@label}.#{@account}"
end
end
class Client
attr_reader :version, :mode, :agent
def initialize(version:, mode:, agent:)
@version = version
@mode = mode
@agent = agent
end
def to_s
"#{@version}/#{@mode}/#{@agent}"
end
end
class Connection
OPTIONS = [:username, :password, :clean_session, :keep_alive]
def initialize(host:, port:, client:, **kwargs)
@client = client
@mqtt = MQTT::Client.new
@mqtt.host = host
@mqtt.port = port
@mqtt.client_id = client.to_s
OPTIONS.each do |option|
@mqtt.send("#{option}=", kwargs[option]) if kwargs[option] != nil
end
end
# Establish the connection.
def connect
@mqtt.connect
LOG.info("#{@client} connected")
end
# Disconnect from the broker.
def disconnect
@mqtt.disconnect
LOG.info("#{@client} disconnected")
end
# Publish a message to the `topic`.
#
# Options:
# - `payload`: An object that will be dumped into JSON as the message payload (required).
# - `properties`: MQTT publish properties hash.
# - `retain`: A boolean indicating whether the messages should be retained.
# - `qos`: An integer 0..2 that sets the QoS.
def publish(topic, payload:, properties: {}, retain: false, qos: 0)
envelope = {
payload: JSON.dump(payload),
properties: properties
}
@mqtt.publish(topic, JSON.dump(envelope), retain, qos)
LOG.info <<~EOF
#{@client.agent} published to #{topic} (q#{qos}, r#{retain ? 1 : 0}):
Payload: #{JSON.pretty_generate(payload)}
Properties: #{JSON.pretty_generate(properties)}
EOF
end
# Subscribe to the `topic`.
#
# Options:
# - `qos`: Subscriptions QoS. An interger 0..2.
def subscribe(topic, qos: 0)
@mqtt.subscribe([topic, qos])
LOG.info("#{@client.agent} subscribed to #{topic} (q#{qos})")
end
# Waits for an incoming message.
# If a block is given it passes the received message to the block.
# If the block returns falsey value it waits for the next one and so on.
# Returns the received message.
# Raises if `timeout` is over.
def receive(timeout=DEFAULT_TIMEOUT)
Timeout::timeout(timeout, nil, "Timed out waiting for the message") do
loop do
topic, json = @mqtt.get
envelope = JSON.load(json)
payload = JSON.load(envelope['payload'])
message = IncomingMessage.new(topic, payload, envelope['properties'])
LOG.info <<~EOF
#{@client.agent} received a message from topic #{topic}:
Payload: #{JSON.pretty_generate(message.payload)}
Properties: #{JSON.pretty_generate(message.properties)}
EOF
return message unless block_given?
if yield(message)
LOG.info "The message matched the given predicate"
return message
else
LOG.info "The message didn't match the given predicate. Waiting for the next one."
end
end
end
end
# A high-level method that makes a request and waits for the response on it.
#
# Options:
# - `to`: the destination service `Account` (required).
# - `payload`: the publish message payload (required).
# - `properties`: additional MQTT properties hash.
# - `qos`: Publish QoS. An integer 0..2.
# - `timeout`: Timeout for the response awaiting.
def make_request(method, to:, payload:, properties: {}, qos: 0, timeout: DEFAULT_TIMEOUT)
correlation_data = SecureRandom.hex
properties.merge!({
type: 'request',
method: method,
correlation_data: correlation_data,
response_topic: "agents/#{@client.agent}/api/v1/in/#{to}"
})
topic = "agents/#{@client.agent}/api/v1/out/#{to}"
publish(topic, payload: payload, properties: properties, qos: qos)
receive(timeout) do |msg|
msg.properties['type'] == 'response' &&
msg.properties['correlation_data'] == correlation_data
end
end
end
class IncomingMessage
attr_reader :topic, :payload, :properties
def initialize(topic, payload, properties)
@topic = topic
@payload = payload
@properties = properties
end
# A shortcut for payload fields. `msg['key']` is the same as `msg.payload['key']`.
def [](key)
@payload[key]
end
end
###############################################################################
# Raises unless the given argument is truthy.
def assert(value)
raise AssertionError.new("Assertion failed") unless value
end
# Builds an `Agent` instance.
def agent(label, account)
Agent.new(label, account)
end
# Builds an `Account` instance.
def account(label, audience)
Account.new(label, audience)
end
# Builds a `Client` instance.
#
# Options:
# - `mode`: Connection mode (required). Available values: `agents`, `service-agents`, `bridge-agents`, `observer-agents`.
# - `version`: Always `v1` for now.
def client(agent, mode:, version: 'v1')
Client.new(version: version, mode: mode, agent: agent)
end
# Connects to the broker and subscribes to the client's inbox topics.
#
# Options:
# - `host`: The broker's host (required).
# - `port`: The broker's TCP port for MQTT connections (required).
# - `client`: The `Client` object (required).
# - `username`: If the broker has authn enabled this requires any non-empty string.
# - `password`: If the broker has authn enalbed this requires the password for the `client`'s account.
# - `clean_session`: A boolean indicating whether the broker has to clean the previos session.
# - `keep_alive`: Keep alive time in seconds.
def connect(host: 'localhost', port: 1883, client:, **kwargs)
conn = Connection.new(host: host, port: port, client: client, **kwargs)
conn.connect
conn.subscribe("agents/#{client.agent}/api/v1/#")
conn
end
# This is an example of a request chain to conference.
# We make a request, get the result id, put it into the next one and so on.
require_relative './dsl'
me = agent('web', account('fey', 'dev.usr.example.org'))
conference = account('conference', 'dev.svc.example.org')
conn = connect host: 'localhost', port: 1883, client: client(me, mode: 'agents')
# Create a room.
response = conn.make_request 'room.create', to: conference, payload: {
audience: 'dev.svc.example.org',
time: [nil, nil]
}
assert response.properties['status'] == '200'
# Create an rtc in the room.
response = conn.make_request 'rtc.create', to: conference, payload: {
room_id: response['id']
}
assert response.properties['status'] == '200'
# Connect to the rtc.
conn.make_request 'rtc.connect', to: conference, payload: {
id: response['id']
}
assert response.properties['status'] == '200'
# This is an example of talking to janus-conference.
# It can't use svc-agent because of Janus Gateway's architecture so the protocol is quite different.
# Because of that we can't use `make_request` but have to call raw `publish` and `subscribe`.
# Before making the actual request Janus requires use to create a session and a plugin handle
# in order to save the state between requests and route messages to plugins.
require_relative './dsl'
rtc_id = '00716b55-8cbf-412b-96df-199c171b1d33'
audience = 'dev.svc.example.org'
conference = agent("alpha", account('conference', audience))
janus = agent("alpha", account('janus-gateway', audience))
janus_inbox = "agents/#{janus}/api/v1/in/#{conference.account}"
conn = connect host: 'localhost', port: 1883, client: client(conference, mode: 'service-agents')
conn.subscribe "apps/#{janus.account}/api/v1/responses"
# Get session.
conn.publish janus_inbox, payload: { janus: 'create', transaction: 'txn-session' }
response = conn.receive { |msg| msg['transaction'] == 'txn-session' }
assert response['janus'] == 'success'
session_id = response['data']['id']
# Get handle.
conn.publish janus_inbox, payload: {
janus: 'attach',
session_id: session_id,
plugin: 'janus.plugin.conference',
transaction: 'txn-handle'
}
response = conn.receive { |msg| msg['transaction'] == 'txn-handle' }
assert response['janus'] == 'success'
handle_id = response['data']['id']
# Make `stream.upload` request.
conn.publish janus_inbox, payload: {
janus: 'message',
session_id: session_id,
handle_id: handle_id,
transaction: 'txn-upload',
body: {
method: 'stream.upload',
id: rtc_id,
bucket: "origin.webinar.#{audience}",
object: "#{rtc_id}.source.mp4"
}
}
response = conn.receive { |msg| msg['transaction'] == 'txn-upload' }
assert response['janus'] == 'ack'
response = conn.receive(30) { |msg| msg['transaction'] == 'txn-upload' }
assert response['janus'] == 'event'
assert response['plugindata']['data']['status'] == '200'
# This is an example of sending a broadcast message to a room.
# I used this one to test them when the broker part wasn't ready at the moment so at first
# it simulates the broker's job by sending `subscription.create` event to conference.
# Then it makes the `message.broadcast` request itself as a user.
# So this example illustrates working with events and multiple connections.
require 'json'
require_relative './dsl'
room_id = '3a7c5e97-726f-4313-8163-2b834f7317b3'
svc_audience = 'dev.svc.example.org'
usr_audience = 'dev.usr.example.org'
broker = agent('alpha', account('mqtt-gateway', svc_audience))
user = agent('web', account('fey', usr_audience))
conference = account('conference', svc_audience)
conn_opts = { host: 'localhost', port: 1883 }
broker_conn = connect conn_opts.merge(client: client(broker, mode: 'service-agents'))
user_conn = connect conn_opts.merge(client: client(user, mode: 'agents'))
# Put user online into the room.
broker_conn.publish "agents/#{user}/api/v1/out/#{conference}",
payload: {
subject: user,
object: ['rooms', room_id, 'events']
},
properties: {
type: 'event',
label: 'subscription.create'
}
# Send broadcast message.
response = user_conn.make_request 'message.broadcast', to: conference, payload: {
room_id: room_id,
data: JSON.dump(key: 'value')
}
# Receive broadcast message in the room's events topic.
response = user_conn.receive do |msg|
msg.topic == "apps/#{conference}/api/v1/rooms/#{room_id}/events"
end
assert JSON.load(response.payload)['key'] == 'value'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment