Skip to content

Instantly share code, notes, and snippets.

@deeja
Created October 16, 2019 22:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save deeja/95d8fa00f23855f6cd65bd3d0ceb048f to your computer and use it in GitHub Desktop.
Save deeja/95d8fa00f23855f6cd65bd3d0ceb048f to your computer and use it in GitHub Desktop.
AWS IoT pushing through led colours on the Pi Zero Blinkt
# Need to set the policies for the subscribe and client ID within AWS
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json
# > pip install blinkt
import blinkt
# Custom MQTT message callback
index = 0
blinkt.set_clear_on_exit()
blinkt.set_brightness(0.1)
def customCallback(client, userdata, message):
global index
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")
blinkt.clear()
blinkt.set_pixel(index,255,0,0)
blinkt.show()
index = (index + 1) % 8
# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="lightchanger",
help="Targeted client id")
args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
port = args.port
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = "light/change"
if args.useWebsocket and args.certificatePath and args.privateKeyPath:
parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
exit(2)
if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
parser.error("Missing credentials for authentication.")
exit(2)
# Port defaults
if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443
port = 443
if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883
port = 8883
# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)
# Init AWSIoTMQTTClient
client = None
if useWebsocket:
client = AWSIoTMQTTClient(clientId, useWebsocket=True)
client.configureEndpoint(host, port)
client.configureCredentials(rootCAPath)
else:
client = AWSIoTMQTTClient(clientId)
client.configureEndpoint(host, port)
client.configureCredentials(rootCAPath, privateKeyPath, certificatePath)
# AWSIoTMQTTClient connection configuration
client.configureAutoReconnectBackoffTime(1, 32, 20)
client.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
client.configureDrainingFrequency(2) # Draining: 2 Hz
client.configureConnectDisconnectTimeout(10) # 10 sec
client.configureMQTTOperationTimeout(5) # 5 sec
# Connect and subscribe to AWS IoT
client.connect()
client.subscribe(topic, 1, customCallback)
time.sleep(2)
while True:
print("Waiting")
time.sleep(10)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment