Skip to content

Instantly share code, notes, and snippets.

@TamojitSaha
Created September 12, 2020 13:10
Show Gist options
  • Save TamojitSaha/c39d8f5f230ac195344d8fda1d908b69 to your computer and use it in GitHub Desktop.
Save TamojitSaha/c39d8f5f230ac195344d8fda1d908b69 to your computer and use it in GitHub Desktop.
Use AWS IoT Device SDK for Python to subscribe to a topic
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time as t
import json
import threading
# Define ENDPOINT, CLIENT_ID, PATH_TO_CERT, PATH_TO_KEY, PATH_TO_ROOT, MESSAGE, TOPIC, and RANGE
ENDPOINT = "xxxxxxxxxx-ats.iot.us-east-2.amazonaws.com"
CLIENT_ID = "testDevice"
PATH_TO_CERT = "your_cert.crt"
PATH_TO_KEY = "your_cert_key.key"
PATH_TO_ROOT = "AmazonRootCAx.pem"
# MESSAGE = "Hello World"
# TOPIC = "test/testing"
RANGE = 20
subscribeTopic = "topic_filter/topic"
received_count = 0
received_all_event = threading.Event()
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
print("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count += 1
if received_count == RANGE:
received_all_event.set()
if __name__ == '__main__':
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
cert_filepath=PATH_TO_CERT,
pri_key_filepath=PATH_TO_KEY,
client_bootstrap=client_bootstrap,
ca_filepath=PATH_TO_ROOT,
client_id=CLIENT_ID,
clean_session=False,
keep_alive_secs=30
)
print("Connecting to {} with client ID '{}'...".format(
ENDPOINT, CLIENT_ID))
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")
# Publish message to server desired number of times.
# print('Begin Publish')
# for i in range (RANGE):
# data = "{} [{}]".format(MESSAGE, i+1)
# message = {"message" : data}
# mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
# print("Published: '" + json.dumps(message) + "' to the topic: " + "'test/testing'")
# t.sleep(0.1)
# print('Publish End')
# disconnect_future = mqtt_connection.disconnect()
# disconnect_future.result()
print("Subscribing to topic '{}'...".format(publishTopic))
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=publishTopic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
print("\nSubscribed with {}".format(str(subscribe_result['qos'])))
# Wait for all messages to be received.
# This waits forever if count was set to 0.
if RANGE != 0 and not received_all_event.is_set():
print("\nWaiting for all messages to be received...")
received_all_event.wait()
print("\n{} message(s) received.".format(received_count))
# Disconnect
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment