Skip to content

Instantly share code, notes, and snippets.

Last active July 17, 2017 03:52
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save heri16/26bc1a7be4648cbdf20898df3ee283d3 to your computer and use it in GitHub Desktop.
Save heri16/26bc1a7be4648cbdf20898df3ee283d3 to your computer and use it in GitHub Desktop.
Elixir ThingShadow Client for AWS IoT based on emqttc
defmodule Aws.Iot.ThingShadow do
@moduledoc ~S"""
Implements the AWS IoT ThingShadow Data Flow behaviour.
Closely follows the logic from the official nodejs library, with minor deviations to adapt to Elixir Coding Conventions:
Elixir/Erlang's GenEvent executes handlers sequentially in a single loop of the GenEvent.manager process.
To avoid this bottleneck, please ensure that each ThingShadow process has a different `GenEvent.manager`.
Also, please use `ThingShadow.EventHandler` module together with `GenEvent.add_mon_handler/3` if available.
use GenServer
## Client API ##
@doc """
Starts a `ThingShadow` process linked to the current process.
There are 2 ways to start the process.
## Method 1
- `event_manager` should be a `GenEvent.manager` type, procuded by calling `GenEvent.start_link/1`
- `mqtt_options` should be in the Keyword format that is expected by `:emqttc.start_link/1`
## Method 2
- `event_manager` should be a `GenEvent.manager` type, procuded by calling `GenEvent.start_link/1`
- `app_name` should be the Mix/OTP application name (atom) that has been configure in config/config.exs file
The configuration looks like this:
config :app_name, __MODULE__,
host: "",
port: 8883,
client_id: "xxxxxxx",
thing_name: "xxxxxxx",
ca_cert: "config/certs/root-CA.crt",
client_cert: "config/certs/xxxxxxxxxx-certificate.pem.crt",
private_key: "config/certs/xxxxxxxxxx-private.pem.key"
## Return values
If the server is successfully created and initialized, the function returns
`{:ok, pid}`, where pid is the pid of the server.
Use `pid` with other functions defined in this module.
If the server could not start, the process is terminated and the function returns
`{:error, reason}`, where reason is the error reason.
@spec start_link(GenEvent.manager, atom | [{atom, any}], Keyword.t) :: GenServer.on_start
def start_link(event_manager, mqtt_options_or_app_name \\ :aws, start_options \\ [])
def start_link(event_manager, mqtt_options, start_options) when is_list(mqtt_options) and is_list(start_options) do
unless is_pid(event_manager) do
# Convert to distribution pid which can be reached even on other BEAM nodes
event_manager = Process.whereis(event_manager)
GenServer.start_link(__MODULE__, [event_manager, mqtt_options], start_options)
def start_link(event_manager, app_name, start_options) when is_atom(app_name) and is_list(start_options) do
# Event_manager is required to emit events. Use: {:ok, event_manager} = GenEvent.start_link([])
# Get configuration from environment set by config.exs
mix_config_options = Application.get_env(app_name, __MODULE__)
mqtt_options = [
host: to_char_list(mix_config_options[:host]),
port: mix_config_options[:port],
client_id: mix_config_options[:client_id],
clean_sess: true,
keepalive: 60,
connack_timeout: 30,
reconnect: {3, 60},
logger: :info,
ssl: [
cacertfile: to_char_list(mix_config_options[:ca_cert]),
certfile: to_char_list(mix_config_options[:client_cert]),
keyfile: to_char_list(mix_config_options[:private_key])
# Let emqttc resubscribe topics when reconnected.
mqtt_options = [:auto_resub | mqtt_options]
start_link(event_manager, mqtt_options, start_options)
@doc """
Register interest in the Thing Shadow named `thing_name`. The thingShadow process will subscribe to any applicable topics, and will fire events for the Thing Shadow until `unregister/2` is called with `thing_name`. `options` can contain the following arguments to modify how this Thing Shadow is processed:
`ignore_seltas`: set to `true` to not subscribe to the delta sub-topic for this Thing Shadow; used in cases where the application is not interested in changes (e.g. update only.) (default `false`)
`persistent_subscribe`: set to `false` to unsubscribe from all operation sub-topics while not performing an operation (default `true`)
`discard_stale`: set to `false` to allow receiving messages with old version numbers (default `true`)
`enable_versioning`: set to `true` to send version numbers with shadow updates (default `true`)
The `persistent_subscribe` argument allows an application to get faster operation responses at the expense of potentially receiving more irrelevant response traffic (i.e., response traffic for other clients who have registered interest in the same Thing Shadow). When persistent_subscribe is set to `false`, operation sub-topics are only subscribed to during the scope of that operation; note that in this mode, update, get, and delete operations will be much slower; however, the application will be less likely to receive irrelevant response traffic.
The `discard_stale` argument allows applications to receive messages which have obsolete version numbers. This can happen when messages are received out-of-order; applications which set this argument to `false` should use other methods to determine how to treat the data (e.g. use a time stamp property to know how old/stale it is).
If `enable_versioning` is set to `true`, version numbers will be sent with each operation. AWS IoT maintains version numbers for each shadow, and will reject operations which contain the incorrect version; in applications where multiple clients update the same shadow, clients can use versioning to avoid overwriting each other's changes.
@spec register(GenServer.t, String.t, Keyword.t) :: {:ok, atom} | {:error, any}
def register(pid, thing_name, options \\ [qos: 1]) when is_binary(thing_name) and is_list(options) do, {:thing_register, thing_name, options})
@doc """
Unregister interest in the Thing Shadow named `thing_name`. The thingShadow process will unsubscribe from all applicable topics and no more events will be fired for thing_name.
@spec unregister(GenServer.t, String.t) :: {:ok, atom} | {:error, any}
def unregister(pid, thing_name) when is_binary(thing_name) do, {:thing_unregister, thing_name})
@doc """
Update the Thing Shadow named `thing_name` with the state specified in the object `shadow_state_object`. `thing_name` must have been previously registered using `register/3`. The thingShadow process will subscribe to all applicable topics and publish `shadow_state_object` on the update sub-topic.
If the operation is in progress, this function returns `{:ok, client_token}`.
`client_token` is a unique value associated with the update operation. When a 'status' or 'timeout' event is emitted, the client_token will be supplied as one of the parameters, allowing the application to keep track of the status of each operation. The caller may create their own client_token value; if `shadow_state_object` contains a client_token property, that will be used rather than the internally generated value. Note that it should be of atomic type (i.e. numeric or string). This function returns `nil` if an operation is already in progress.
`operation_timeout` (milliseconds). If no accepted or rejected response to a thing operation is received within this time, subscriptions to the accepted and rejected sub-topics for a thing are cancelled.
@spec update(GenServer.t, String.t, %{atom => any}) :: {:ok, atom} | {:error, any}
def update(pid, thing_name, shadow_state_object, operation_timeout \\ 10000) when is_binary(thing_name) and is_map(shadow_state_object) and operation_timeout >= 0 do
#shadow_state_object = %{
# state: %{
# reported: %{
# "awsSqsSyncLastMessageId" => "3c52e73a-284c-4dc1-80d6-2d01d64f5b35"
# }
# }
#}, {:thing_operation, :update, thing_name, shadow_state_object, operation_timeout})
@doc """
Get the current state of the Thing Shadow named `thing_name`, which must have been previously registered using `register/3`. The thingShadow process will subscribe to all applicable topics and publish on the get sub-topic.
If the operation is in progress, this function returns `{:ok, client_token}`.
`client_token` is a unique value associated with the get operation. When a 'status or 'timeout' event is emitted, the client_token will be supplied as one of the parameters, allowing the application to keep track of the status of each operation. The caller may supply their own client_token value (optional); if supplied, the value of client_token will be used rather than the internally generated value. Note that this value should be of atomic type (i.e. numeric or string). This function returns `nil` if an operation is already in progress.
`operation_timeout` (milliseconds). If no accepted or rejected response to a thing operation is received within this time, subscriptions to the accepted and rejected sub-topics for a thing are cancelled.
@spec get(GenServer.t, String.t, String.t) :: {:ok, atom} | {:error, any}
def get(pid, thing_name, client_token \\ "", operation_timeout \\ 10000) when is_binary(thing_name) and is_binary(client_token) and operation_timeout >= 0 do
shadow_state_object = if (client_token == ""), do: %{}, else: %{clientToken: client_token}, {:thing_operation, :get, thing_name, shadow_state_object, operation_timeout})
@doc """
Delete the Thing Shadow named `thing_name`, which must have been previously registered using `register/3`. The thingShadow process will subscribe to all applicable topics and publish on the delete sub-topic.
If the operation is in progress, this function returns `{:ok, client_token}`.
`client_token` is a unique value associated with the delete operation. When a 'status' or 'timeout' event is emitted, the client_token will be supplied as one of the parameters, allowing the application to keep track of the status of each operation. The caller may supply their own client_token value (optional); if supplied, the value of client_token will be used rather than the internally generated value. Note that this value should be of atomic type (i.e. numeric or string). This function returns `nil` if an operation is already in progress.
`operation_timeout` (milliseconds). If no accepted or rejected response to a thing operation is received within this time, subscriptions to the accepted and rejected sub-topics for a thing are cancelled.
@spec delete(GenServer.t, String.t, String.t) :: {:ok, atom} | {:error, any}
def delete(pid, thing_name, client_token \\ "", operation_timeout \\ 10000) when is_binary(thing_name) and is_binary(client_token) and operation_timeout >= 0 do
shadow_state_object = if (client_token == ""), do: %{}, else: %{clientToken: client_token}, {:thing_operation, :get, thing_name, shadow_state_object, operation_timeout})
@doc """
Identical to the `:emqttc.publish/3` method, with the restriction that the topic may not represent a Thing Shadow. This method allows the user to publish messages to topics on the same connection used to access Thing Shadows.
@spec publish(GenServer.t, String.t, any, Keyword.t) :: :ok | {:error, any}
def publish(pid, topic, payload, options \\ [qos: 0]) when is_binary(topic) and is_list(options) do, {:thing_publish, topic, payload, options})
@doc """
Identical to the `:emqttc.subscribe/2` method, with the restriction that the topic may not represent a Thing Shadow. This method allows the user to subscribe to messages from topics on the same connection used to access Thing Shadows.
@spec subscribe(GenServer.t, String.t | {String.t, atom | integer} | [{String.t, atom | integer}], Keyword.t) :: :ok | {:error, any}
def subscribe(pid, topics, options \\ [qos: 0])
def subscribe(pid, topics_with_qos = [ {topic, qos} | _ ], options) when is_binary(topic) and (is_atom(qos) or is_integer(qos)) and is_list(options) do, {:thing_subscribe, topics_with_qos, options})
def subscribe(pid, topic_with_qos = {topic, qos}, options) when is_binary(topic) and (is_atom(qos) or is_integer(qos)) and is_list(options) do, {:thing_subscribe, topic_with_qos, options})
def subscribe(pid, topic, options) when is_binary(topic) and is_list(options) do
if options[:qos] == nil do
{:error, "options is missing qos"}
else, {:thing_subscribe, topic, options})
@doc """
Identical to the `:emqttc.unsubscribe/1` method, with the restriction that the topic may not represent a Thing Shadow. This method allows the user to unsubscribe from topics on the same used to access Thing Shadows.
@spec unsubscribe(GenServer.t, String.t | [String.t]) :: :ok | {:error, any}
def unsubscribe(pid, topics = [ topic | _ ]) when is_binary(topic) do, {:thing_unsubscribe, topics})
def unsubscribe(pid, topic) when is_binary(topic) do, {:thing_unsubscribe, topic})
@doc """
Invokes the `GenServer.stop/3` method on the thingShadow process. This causes `terminate/2` to be called and the MQTT connection owned by the thingShadow process to be disconnected. The force parameters is optional and identical in function to the `:shutdown` reason parameter in the `GenServer.stop/3` method.
@spec endConnection(GenServer.t, boolean) :: :ok
def endConnection(pid, force \\ false)
def endConnection(pid, _force = true) do
GenServer.stop(pid, :shutdown, :infinity)
def endConnection(pid, _force) do
GenServer.stop(pid, :normal, :infinity)
## GenServer Callbacks ##
@doc """
Responsible for connecting to the MQTT broker
def init([event_manager, mqtt_options]) do
# GenEvent.manager needs to be monitored (on top of supervision).
# Monitor means that ThingShadow GenServer process should stop when its event_manager exits/crashes.
# However, the event_manager should not stop if the ThingShadow GenServer process crashes.
_monitor_ref = Process.monitor(event_manager)
if Process.alive?(event_manager) == false do
{:stop, :event_manager_down}
try do
{:ok, client} = :emqttc.start_link(mqtt_options)
{:ok, %{mqttc: client, event_manager: event_manager, thing_shadows: %{}, client_id: mqtt_options[:client_id], seq: 0 } }
e -> {:stop, e}
@doc """
Responsible for handling thingshadow register-interest calls
## Options ##
* qos: set to the desired QoS level :qos0 or :qos1 for the MQTT messages (default :qos0)
* ignore_deltas: set to true to not subscribe to the delta sub-topic for this Thing Shadow; used in cases where the application is not interested in changes (e.g. update only.) (default false)
* persistent_subscribe: set to false to unsubscribe from all operation sub-topics while not performing an operation (default true)
* discard_stale: set to false to allow receiving messages with old version numbers (default true)
* enable_versioning: set to true to send version numbers with shadow updates (default true)
def handle_call({:thing_register, thing_name, opts}, _from, state = %{mqttc: client, thing_shadows: thing_shadows}) when is_binary(thing_name) and is_list(opts) do
# DONE: Implement version keeping and options[:discard_stale]
# DONE: Delay subscribe to during update/delete operation only, if options[:persistent_subscribe]
# DONE: Send version numbers with shadow updates if options[:enable_versioning]
# DONE: Do not subscribe to delta topics if options[:ignore_deltas]
ignore_deltas = Keyword.get(opts, :ignore_deltas, false)
persistent_subscribe = Keyword.get(opts, :persistent_subscribe, true)
discard_stale = Keyword.get(opts, :discard_stale, true)
enable_versioning = Keyword.get(opts, :enable_versioning, true)
qos = Keyword.get(opts, :qos, :qos0)
current_thing = %{
operation_timers: %{},
persistent_subscribe: persistent_subscribe,
discard_stale: discard_stale,
enable_versioning: enable_versioning,
qos: qos
new_state = %{state | thing_shadows: Map.put_new(thing_shadows, thing_name,current_thing) }
# Exclude delta topic if ignore_delta == true
subscribe_result = case ignore_deltas do
false ->
case persistent_subscribe do
true ->
handle_subscriptions(client, {:subscribe, thing_name, [:update, :get, :delete], [:delta, :accepted, :rejected]}, current_thing)
false ->
handle_subscriptions(client, {:subscribe, thing_name, [:update], [:delta]}, current_thing)
true ->
case persistent_subscribe do
true ->
handle_subscriptions(client, {:subscribe, thing_name, [:update, :get, :delete], [:accepted, :rejected]}, current_thing)
false ->
# Do nothing
{:ok, :noop}
case subscribe_result do
:ok ->
# Update to new state on ok
{:reply, :ok, new_state}
{:ok, _} ->
# Update to new state on ok
{:reply, :ok, new_state}
:error ->
# Keep previous state on error
{:reply, :error, state}
{:error, _} ->
# Keep previous state on error
{:reply, :error, state}
@doc """
Responsible for handling thingshadow update/get/delete operation-requests for a registered thing
def handle_call({:thing_operation, operation, thing_name, shadow_state_object, operation_timeout}, _from, state = %{mqttc: client, thing_shadows: thing_shadows, client_id: client_id, seq: seq}) when is_atom(operation) and is_binary(thing_name) do
case thing_shadows do
%{^thing_name => current_thing} ->
# Generate client_token if missing from shadow_state_object
client_token = shadow_state_object[:clientToken] || shadow_state_object["clientToken"] || "#{client_id}-#{seq}"
shadow_state_object = Map.put_new(shadow_state_object, :clientToken, client_token)
# Subscribe to response topics (if persistent_subscribe is false)
case handle_persistent_subscriptions(client, {:subscribe, thing_name, [operation], [:accepted, :rejected]}, current_thing) do
{:ok, _} ->
# After a period of time, we are no longer interested in the accepted/rejected response, so we unsubscribe (if persistent_subscribe is false)
operation_timer = Process.send_after(self(), {:thing_operation_timeout, thing_name, operation, client_token}, operation_timeout)
publish_topic = build_thing_shadow_topic(thing_name, operation)
case publish_shadow_state_to_topic(client, publish_topic, shadow_state_object, current_thing) do
:ok ->
current_thing = current_thing |> Map.update(:operation_timers, %{client_token => operation_timer}, fn current_operation_timers ->
Map.put_new(current_operation_timers, client_token, operation_timer)
# Update state of inflight_tokens in current_thing and increment seq
thing_shadows = %{thing_shadows | thing_name => current_thing}
{:reply, {:ok, client_token}, %{state | thing_shadows: thing_shadows, seq: seq+1 } }
other ->
# Would rather not reuse sequence number even on publish error,
# in case message reaches broker eventually, causing AWS IoT responses with same client_token
{:reply, other, %{state | seq: seq+1 }}
other ->
{:reply, other, state}
_no_match ->
{:reply, {:error, "Attempting to #{operation} unknown thing: #{thing_name}. Please register beforehand."}, state }
@doc """
Responsible for handling publish requests on non-thing topics
def handle_call({:thing_publish, topic, payload, opts}, _from, state = %{mqttc: client}) when is_binary(topic) do
publish_result = :emqttc.publish(client, topic, payload, opts)
{:reply, publish_result, state}
@doc """
Responsible for handling subscribe requests on non-thing topics
def handle_call({:thing_subscribe, topics, _opts}, _from, state = %{mqttc: client}) when is_list(topics) do
subscribe_result = :emqttc.subscribe(client, topics)
{:reply, subscribe_result, state}
def handle_call({:thing_subscribe, topic, _opts}, _from, state = %{mqttc: client}) when is_tuple(topic) do
subscribe_result = :emqttc.subscribe(client, topic)
{:reply, subscribe_result, state}
def handle_call({:thing_subscribe, topic, opts}, _from, state = %{mqttc: client}) when is_binary(topic) do
subscribe_result = :emqttc.subscribe(client, topic, opts[:qos])
{:reply, subscribe_result, state}
@doc """
Responsible for handling unsubscribe requests on non-thing topics
def handle_call({:thing_unsubscribe, topics}, _from, state = %{mqttc: client}) when is_list(topics) do
unsubscribe_result = :emqttc.unsubscribe(client, topics)
{:reply, unsubscribe_result, state}
def handle_call({:thing_unsubscribe, topic}, _from, state = %{mqttc: client}) when is_binary(topic) do
unsubscribe_result = :emqttc.subscribe(client, topic)
{:reply, unsubscribe_result, state}
@doc """
Responsible for receiving MQTT broker connected event from mqttc client
def handle_info({:mqttc, client, :connected}, state = %{mqttc: client, event_manager: handlers}) do
IO.puts "MQTT client #{inspect(client)} is connected"
GenEvent.ack_notify(handlers, {:connect, self})
{:noreply, state}
@doc """
Responsible for receiving MQTT broker disconnected event from mqttc client
def handle_info({:mqttc, client, :disconnected}, state = %{mqttc: client, event_manager: handlers}) do
IO.puts "MQTT client #{inspect(client)} is disconnected"
GenEvent.ack_notify(handlers, {:disconnect, self})
{:noreply, state}
@doc """
Responsible for receiving event_manager down notification.
ThingShadow GenServer process should stop when event_manager is dead.
def handle_info({:DOWN, _ref, :process, {event_manager, _node}, _reason}, state = %{event_manager: event_manager} ) do
{:stop, :event_manager_down, state}
@doc """
Responsible for receiving operation timeout messages
def handle_info({:thing_operation_timeout, thing_name, operation, client_token}, state = %{mqttc: client, event_manager: handlers, thing_shadows: thing_shadows}) do
case thing_shadows do
%{^thing_name => current_thing} ->
# Unsubscribe from response topics (if persistent_subscribe is false)
handle_persistent_subscriptions(client, {:unsubscribe, thing_name, [operation], [:accepted, :rejected]}, current_thing)
# Notify timeout event handlers
GenEvent.ack_notify(handlers, {:timeout, thing_name, client_token})
# Remove client_token from operation_timers in current_thing
current_thing = current_thing |> Map.update(:operation_timers, %{}, fn current_operation_timers ->
current_operation_timers |> Map.delete(client_token)
# Update state of version in current_thing
thing_shadows = %{thing_shadows | thing_name => current_thing}
{:noreply, %{state | thing_shadows: thing_shadows}}
_no_match ->
{:noreply, state}
@doc """
Responsible for receiving MQTT messages from mqttc client
def handle_info({:publish, topic, payload}, state) do
IO.puts "Message from #{topic}: #{payload}"
# Handle topics by pattern-matching
topic |> String.split("/", parts: 6) |> handle_topic_message(payload, state)
@doc """
Responsible for receiving MQTT messages from thingshadow topics of registered things
def handle_topic_message(topic_iolist = ["$aws", "things", thing_name, "shadow", "update", "delta"], payload, state = %{event_manager: handlers, thing_shadows: thing_shadows}) when is_binary(thing_name) do
# Handle delta response message
case thing_shadows do
%{^thing_name => current_thing} ->
case Poison.Parser.parse(payload) do
{:ok, shadow_state_object} ->
_client_token = shadow_state_object["clientToken"] || shadow_state_object[:clientToken]
version = shadow_state_object["version"] || shadow_state_object[:version]
case update_version_in_thing(current_thing, version, :update) do
nil ->
# Do nothing as message is to be discarded
{:noreply, state}
current_thing ->
# Notify delta event handlers
GenEvent.ack_notify(handlers, {:delta, thing_name, shadow_state_object})
# Update state of version in current_thing
thing_shadows = %{thing_shadows | thing_name => current_thing}
{:noreply, %{state | thing_shadows: thing_shadows}}
{:error, _reason} ->
{:noreply, state}
_no_match ->
GenEvent.ack_notify(handlers, {:message, to_string(topic_iolist), payload})
{:noreply, state}
def handle_topic_message(topic_iolist = ["$aws", "things", thing_name, "shadow", operation, status], payload, state = %{mqttc: client, event_manager: handlers, thing_shadows: thing_shadows}) when is_binary(thing_name) and is_binary(operation) and is_binary(status) do
# Handle accepted/rejected response message
case thing_shadows do
%{^thing_name => current_thing} ->
# Convert string to atoms
operation = case operation do
"update" -> :update
"get" -> :get
"delete" -> :delete
status = case status do
"accepted" -> :accepted
"rejected" -> :rejected
case Poison.Parser.parse(payload) do
{:ok, shadow_state_object} ->
client_token = shadow_state_object["clientToken"] || shadow_state_object[:clientToken]
version = shadow_state_object["version"] || shadow_state_object[:version]
case update_version_in_thing(current_thing, version, operation) do
nil ->
# Do nothing as message is to be discarded
{:noreply, state}
current_thing = %{operation_timers: _current_operation_timers = %{^client_token => _} } ->
# Cancel operation_timeout timer, and Remove client_token from operation_timers in current_thing
current_thing = current_thing |> Map.update(:operation_timers, %{}, fn current_operation_timers ->
{operation_timer, new_operation_timers} = current_operation_timers |> Map.pop(client_token)
if (operation_timer != nil), do: Process.cancel_timer(operation_timer)
# Unsubscribe if persistent_subscribe is false
handle_persistent_subscriptions(client, {:unsubscribe, thing_name, [operation], [:accepted, :rejected]}, current_thing)
# Notify status event handlers
GenEvent.ack_notify(handlers, {:status, thing_name, status, client_token, shadow_state_object})
# Update state of version and operation_timers in current_thing
thing_shadows = %{thing_shadows | thing_name => current_thing}
{:noreply, %{state | thing_shadows: thing_shadows}}
current_thing when status == :accepted and operation != :get ->
# This operation is not made from this client
GenEvent.ack_notify(handlers, {:foreignStateChange, thing_name, operation, shadow_state_object})
# Update state of version in current_thing
thing_shadows = %{thing_shadows | thing_name => current_thing}
{:noreply, %{state | thing_shadows: thing_shadows}}
current_thing when true ->
# Just update the version state as this operation is not made from this client, and is not relevant to local event handlers
thing_shadows = %{thing_shadows | thing_name => current_thing}
{:noreply, %{state | thing_shadows: thing_shadows}}
{:error, _reason} ->
{:noreply, state}
_no_match ->
GenEvent.ack_notify(handlers, {:message, to_string(topic_iolist), payload})
{:noreply, state}
def handle_topic_message(topic_iolist, payload, state = %{event_manager: handlers}) when is_list(topic_iolist) do
# Handle messages from other topics
GenEvent.ack_notify(handlers, {:message, to_string(topic_iolist), payload})
{:noreply, state}
def terminate(reason, state = %{mqttc: client} ) do
if Process.alive?(client) do
:ok = :emqttc.disconnect(client)
super(reason, state)
## Helper Functions ##
### Handles subscribe and unsubscribe actions on thingshadow topics
defp handle_subscriptions(client, {:subscribe, thing_name, operations, statii}, _thing_opts = %{qos: qos}) when is_binary(thing_name) do
topics = build_thing_shadow_topics(thing_name, operations, statii)
topics_with_qos = topics |> topic -> {topic, qos} end)
:emqttc.subscribe(client, topics_with_qos)
# Bug filed under
# :emqttc.sync_subscribe(client, topics_with_qos)
defp handle_subscriptions(client, {:unsubscribe, thing_name, operations, statii}, _thing_opts) when is_binary(thing_name) do
topics = build_thing_shadow_topics(thing_name, operations, statii)
:emqttc.unsubscribe(client, topics)
### Handles persistent subscribe and unsubscribe actions on thingshadow topics
defp handle_persistent_subscriptions(_client, _topic_params = {_, thing_name, _operations, _statii}, _thing_opts = %{persistent_subscribe: true}) when is_binary(thing_name) do
{:ok, :noop}
defp handle_persistent_subscriptions(client, topic_params = {_, thing_name, _operations, _statii}, thing_opts = %{persistent_subscribe: false}) when is_binary(thing_name) do
handle_subscriptions(client, topic_params, thing_opts)
### Publish thingshadow shadow_state_object to MQTT topic with or without versioning
defp publish_shadow_state_to_topic(client, topic, shadow_state_object, _current_thing = %{qos: qos, enable_versioning: true, version: version}) when is_binary(topic) do
# Enable_versioning is true and version is available
shadow_state_object = Map.put_new(shadow_state_object, :version, version)
# Encode shadow_state_object to json, and then Asynchronous publish to topic. If qos1, emqttc will queue the resend.
shadow_state_object_json = shadow_state_object |> Poison.Encoder.encode([]) |> to_string
:emqttc.publish(client, topic, shadow_state_object_json, qos)
defp publish_shadow_state_to_topic(client, topic, shadow_state_object, _current_thing = %{qos: qos, enable_versioning: _}) when is_binary(topic) do
# Version is unavailable or Enable_versioning is false
# Encode shadow_state_object to json, and then Asynchronous publish to topic. If qos1, emqttc will queue the resend.
shadow_state_object_json = shadow_state_object |> Poison.Encoder.encode([]) |> to_string
:emqttc.publish(client, topic, shadow_state_object_json, qos)
### Update version in thing map based on whether `new_version` is indeed newer than `current_version`
defp update_version_in_thing(current_thing = %{version: current_version, discard_stale: _}, new_version, _operation = :delete) when new_version <= current_version do
# New_version is older than current_version, but operation is delete
# Do not discard no matter the value of discard_stale option.
defp update_version_in_thing(current_thing = %{version: current_version, discard_stale: false}, new_version, _operation) when new_version <= current_version do
# New_version is older than current_version, but discard_stale option is false
# Do not discard as discard_stale = false
defp update_version_in_thing(_current_thing = %{version: current_version, discard_stale: true}, new_version, _operation) when new_version <= current_version do
# New_version is older than current_version, and discard_stale option is true
# Indicate discard new state by returning nil
defp update_version_in_thing(current_thing = %{version: current_version}, new_version, _operation) when new_version > current_version do
# New_version is newer than current_version
%{current_thing | version: new_version}
defp update_version_in_thing(current_thing, new_version, _operation) do
# Current_version is undefined
Map.put_new(current_thing, :version, new_version)
@doc """
Builds thingshadow topic according to AWS Spec
def build_thing_shadow_topic(thing_name, operation, status \\ nil)
def build_thing_shadow_topic(thing_name, operation = :update, status = :delta) do
def build_thing_shadow_topic(_thing_name, _operation, _status = :delta) do
# Delta is only valid for update operation.
def build_thing_shadow_topic(thing_name, operation, _status = nil) do
# Used to build a topic to publish to
def build_thing_shadow_topic(thing_name, operation, status) do
# Used to build a topic to subscribe to
@doc """
Builds multiple thingshadow topics at once and returns a flat list
def build_thing_shadow_topics(thing_name, operations, statii) when is_binary(thing_name) and is_list(operations) and is_list(statii) do
Enum.flat_map(operations, fn operation ->
|>, operation, &1))
|> Enum.filter(&(&1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment