Created
September 12, 2020 13:10
-
-
Save TamojitSaha/c39d8f5f230ac195344d8fda1d908b69 to your computer and use it in GitHub Desktop.
Use AWS IoT Device SDK for Python to subscribe to a topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from awscrt import io, mqtt, auth, http | |
from awsiot import mqtt_connection_builder | |
import time as t | |
import json | |
import threading | |
# Define ENDPOINT, CLIENT_ID, PATH_TO_CERT, PATH_TO_KEY, PATH_TO_ROOT, MESSAGE, TOPIC, and RANGE | |
ENDPOINT = "xxxxxxxxxx-ats.iot.us-east-2.amazonaws.com" | |
CLIENT_ID = "testDevice" | |
PATH_TO_CERT = "your_cert.crt" | |
PATH_TO_KEY = "your_cert_key.key" | |
PATH_TO_ROOT = "AmazonRootCAx.pem" | |
# MESSAGE = "Hello World" | |
# TOPIC = "test/testing" | |
RANGE = 20 | |
subscribeTopic = "topic_filter/topic" | |
received_count = 0 | |
received_all_event = threading.Event() | |
# Callback when the subscribed topic receives a message | |
def on_message_received(topic, payload, **kwargs): | |
print("Received message from topic '{}': {}".format(topic, payload)) | |
global received_count | |
received_count += 1 | |
if received_count == RANGE: | |
received_all_event.set() | |
if __name__ == '__main__': | |
# Spin up resources | |
event_loop_group = io.EventLoopGroup(1) | |
host_resolver = io.DefaultHostResolver(event_loop_group) | |
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) | |
mqtt_connection = mqtt_connection_builder.mtls_from_path( | |
endpoint=ENDPOINT, | |
cert_filepath=PATH_TO_CERT, | |
pri_key_filepath=PATH_TO_KEY, | |
client_bootstrap=client_bootstrap, | |
ca_filepath=PATH_TO_ROOT, | |
client_id=CLIENT_ID, | |
clean_session=False, | |
keep_alive_secs=30 | |
) | |
print("Connecting to {} with client ID '{}'...".format( | |
ENDPOINT, CLIENT_ID)) | |
# Make the connect() call | |
connect_future = mqtt_connection.connect() | |
# Future.result() waits until a result is available | |
connect_future.result() | |
print("Connected!") | |
# Publish message to server desired number of times. | |
# print('Begin Publish') | |
# for i in range (RANGE): | |
# data = "{} [{}]".format(MESSAGE, i+1) | |
# message = {"message" : data} | |
# mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE) | |
# print("Published: '" + json.dumps(message) + "' to the topic: " + "'test/testing'") | |
# t.sleep(0.1) | |
# print('Publish End') | |
# disconnect_future = mqtt_connection.disconnect() | |
# disconnect_future.result() | |
print("Subscribing to topic '{}'...".format(publishTopic)) | |
subscribe_future, packet_id = mqtt_connection.subscribe( | |
topic=publishTopic, | |
qos=mqtt.QoS.AT_LEAST_ONCE, | |
callback=on_message_received) | |
subscribe_result = subscribe_future.result() | |
print("\nSubscribed with {}".format(str(subscribe_result['qos']))) | |
# Wait for all messages to be received. | |
# This waits forever if count was set to 0. | |
if RANGE != 0 and not received_all_event.is_set(): | |
print("\nWaiting for all messages to be received...") | |
received_all_event.wait() | |
print("\n{} message(s) received.".format(received_count)) | |
# Disconnect | |
print("Disconnecting...") | |
disconnect_future = mqtt_connection.disconnect() | |
disconnect_future.result() | |
print("Disconnected!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment