Skip to content

Instantly share code, notes, and snippets.

@ejohnso49
Created June 23, 2023 15:48
Show Gist options
  • Save ejohnso49/0d46b2593230818162be64cf53a726bc to your computer and use it in GitHub Desktop.
Save ejohnso49/0d46b2593230818162be64cf53a726bc to your computer and use it in GitHub Desktop.
Simple MQTT -> Memfault Forwarder
import paho.mqtt.client as mqtt
import requests
# MQTT broker configuration
broker_address = "localhost"
broker_port = 1883
broker_username = "test"
broker_password = "test1234"
# Memfault HTTP endpoint configuration
memfault_api_key = "your_api_key"
memfault_project_key = "your_project_key"
memfault_chunks_endpoint = "https://chunks.memfault.com/api/v0/chunks"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT broker")
client.subscribe("memfault/+/chunks") # Subscribe to dynamic device serial numbers
else:
print("Connection failed")
def on_message(client, userdata, msg):
topic_parts = msg.topic.split("/")
if len(topic_parts) == 3 and topic_parts[0] == "memfault" and topic_parts[2] == "chunks":
device_serial = topic_parts[1]
process_memfault_chunk(device_serial, msg.payload.decode())
def process_memfault_chunk(device_serial, chunk_data):
# Construct the path for the Memfault chunks API endpoint based on the device serial number
endpoint = f"{memfault_chunks_endpoint}/{device_serial}"
# Forward the Memfault chunk data to the Memfault chunks API endpoint
headers = {
"Content-Type": "application/octet-stream",
"Memfault-Project-Key": memfault_project_key,
}
response = requests.post(endpoint, data=chunk_data, headers=headers)
if response.status_code == 200:
print(f"Memfault chunk for device {device_serial} forwarded successfully")
else:
print(f"Failed to forward Memfault chunk for device {device_serial}:", response.text)
client = mqtt.Client()
client.username_pw_set(broker_username, broker_password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, 60)
print(client.publish("topic/test",'Cedalo Mosquitto is awesome'))
client.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment