Skip to content

Instantly share code, notes, and snippets.

@gausby
Created Aug 3, 2018
Embed
What would you like to do?

Example!

This example starts four tortoise clients, :a, :b, :c, and :d who subscribe to the same shared topic: bar. Notice that the client expect a broker running on localhost, on port 1883, and accept username/password less connections.

Start the example by typing MqttTest.start() in an iex session; now one can observe the name of the receiving client change when a message is posted to the bar-topic, using: Tortoise.publish(:a, "bar", "hello")

defmodule MqttTest.Handler do
@moduledoc false
require Logger
defstruct [:name]
alias __MODULE__, as: State
@behaviour Tortoise.Handler
@impl true
def init(opts) do
name = Keyword.get(opts, :name)
{:ok, %State{name: name}}
end
@impl true
def connection(:up, state) do
Logger.info("Connection has been established")
{:ok, state}
end
def connection(:down, state) do
Logger.warn("Connection has been dropped")
{:ok, state}
end
@impl true
def subscription(:up, topic, state) do
Logger.info("Subscribed to #{topic}")
{:ok, state}
end
def subscription({:warn, [requested: req, accepted: qos]}, topic, state) do
Logger.warn("Subscribed to #{topic}; requested #{req} but got accepted with QoS #{qos}")
{:ok, state}
end
def subscription({:error, reason}, topic, state) do
Logger.error("Error subscribing to #{topic}; #{inspect(reason)}")
{:ok, state}
end
def subscription(:down, topic, state) do
Logger.info("Unsubscribed from #{topic}")
{:ok, state}
end
@impl true
def handle_message(topic, publish, state) do
Logger.info("#{state.name}: #{Enum.join(topic, "/")} #{inspect(publish)}")
{:ok, state}
end
@impl true
def terminate(reason, _state) do
Logger.warn("Client has been terminated with reason: #{inspect(reason)}")
:ok
end
end
/tmp/mqtt_test » iex -S mix
Erlang/OTP 21 [erts-10.0.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]
=INFO REPORT==== 3-Aug-2018::16:18:26.661585 ===
application: logger
exited: stopped
type: temporary
Interactive Elixir (1.6.5) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> MqttTest.start()
{:ok, #PID<0.183.0>}
iex(2)>
16:18:36.849 [info] Connection has been established
16:18:36.849 [info] Connection has been established
16:18:36.849 [info] Connection has been established
16:18:36.849 [info] Connection has been established
16:18:36.863 [info] Subscribed to $share/share-group/bar
16:18:36.863 [info] Subscribed to $share/share-group/bar
16:18:36.863 [info] Subscribed to $share/share-group/bar
16:18:36.863 [info] Subscribed to $share/share-group/bar
iex(2)> Tortoise.publish(:a, "bar", "hi")
:ok
iex(3)>
16:18:54.260 [info] b: bar "hi"
iex(3)> Tortoise.publish(:a, "bar", "hi")
:ok
iex(4)>
16:18:55.439 [info] a: bar "hi"
iex(4)> Tortoise.publish(:a, "bar", "hi")
:ok
iex(5)>
16:18:56.391 [info] c: bar "hi"
iex(5)> Tortoise.publish(:a, "bar", "hi")
:ok
iex(6)>
16:18:57.431 [info] a: bar "hi"
iex(6)> Tortoise.publish(:a, "bar", "hi")
:ok
iex(7)>
16:18:58.575 [info] a: bar "hi"
defmodule MqttTest do
def start do
{:ok, _} =
Tortoise.Supervisor.start_child(
client_id: :a,
handler: {MqttTest.Handler, [name: :a]},
server: {Tortoise.Transport.Tcp, host: 'localhost', port: 1883},
subscriptions: [{"$share/share-group/bar", 0}])
{:ok, _} =
Tortoise.Supervisor.start_child(
client_id: :b,
handler: {MqttTest.Handler, [name: :b]},
server: {Tortoise.Transport.Tcp, host: 'localhost', port: 1883},
subscriptions: [{"$share/share-group/bar", 0}])
{:ok, _} =
Tortoise.Supervisor.start_child(
client_id: :c,
handler: {MqttTest.Handler, [name: :c]},
server: {Tortoise.Transport.Tcp, host: 'localhost', port: 1883},
subscriptions: [{"$share/share-group/bar", 0}])
{:ok, _} =
Tortoise.Supervisor.start_child(
client_id: :d,
handler: {MqttTest.Handler, [name: :d]},
server: {Tortoise.Transport.Tcp, host: 'localhost', port: 1883},
subscriptions: [{"$share/share-group/bar", 0}])
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment