Skip to content

Instantly share code, notes, and snippets.

@dennisheitmann
Created June 7, 2021 08:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dennisheitmann/65e6eac0f724497d4bae839a18d36e38 to your computer and use it in GitHub Desktop.
Save dennisheitmann/65e6eac0f724497d4bae839a18d36e38 to your computer and use it in GitHub Desktop.
Check own MQTT server using AWS lambda and send alarm messages using AWS SNS in case of failure
"""
Check if own MQTT server runs
"""
import json
import paho
import boto3
import paho.mqtt.client as mqtt
import ssl
import time
import os
import datetime
import urllib
endpoint = os.environ.get('endpoint')
username = os.environ.get('username')
password = os.environ.get('password')
snsmessage = {
'topic': os.environ.get('snsmessagetopic'),
'body': "HEARTBEAT failed"
}
rcv = False
ssmClient = boto3.client('ssm')
def on_connect(client, userdata, flags, rc):
client.subscribe("#")
def on_message(client, userdata, msg):
receiveTime = datetime.datetime.utcnow()
message = msg.payload.decode("utf-8")
global rcv
rcv = True
def lambda_handler(event, context):
client = mqtt.Client()
client.username_pw_set(username=username, password=password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(endpoint, 1883, 10)
client.loop_start()
time.sleep(15) # wait
client.loop_stop()
if rcv == False:
send_to_sns(snsmessage)
return("ALERT: alert message sent")
else:
return("OK: no alert message sent")
def send_to_sns(message):
sns = boto3.client('sns')
sns.publish(
TopicArn=message['topic'],
Message=message['body']
)
return
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment