Skip to content

Instantly share code, notes, and snippets.

@quimnut

quimnut/mqtt.py Secret

Created June 6, 2024 08:56
Show Gist options
  • Save quimnut/c5ae707c627ed5434b300f6e8a40ccf7 to your computer and use it in GitHub Desktop.
Save quimnut/c5ae707c627ed5434b300f6e8a40ccf7 to your computer and use it in GitHub Desktop.
meshtastic mqtt decoder, for unicode.,
#!/usr/bin/env python3
import paho.mqtt.client as mqtt_client
from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2, telemetry_pb2, BROADCAST_NUM
import base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from google.protobuf.json_format import MessageToJson
import json
config = {
'broker': {
'host': '10.69.69.69',
'port': 1883,
'client_id': 'meshinfo',
'in_topic': 'msh/ANZ/2/e/#',
'out_topic': 'msh/decoded/ANZ',
'username': 'username',
'password': 'password',
},
}
key = "AQ=="
key_hash = "1PG7OiApB1nwvP+rz05pAQ==" # AQ==
def connect_mqtt(broker, port, client_id, username, password):
def on_connect(client, userdata, flags, rc, properties=None):
global mqtt_connect_time
if rc == 0:
print("Connected to MQTT broker")
else:
print("Failed to connect, error: %s\n" % rc)
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2)
client.on_connect = on_connect
# client.username_pw_set(username, password)
client.connect(broker, port)
return client
def publish(client, topic, msg):
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
print("Done!")
return True
else:
print(f"Failed to send message to topic {topic}")
return False
def subscribe(client, topic):
def on_message(client, userdata, msg, properties=None):
try:
se = mqtt_pb2.ServiceEnvelope()
is_encrypted = False
se.ParseFromString(msg.payload)
mp = se.packet
outs = json.loads(MessageToJson(mp))
except Exception as e:
print(f"*** ParseFromString: {str(e)}")
if mp.HasField("encrypted") and not mp.HasField("decoded"):
try:
key_bytes = base64.b64decode(key_hash.encode('ascii'))
nonce_packet_id = getattr(mp, "id").to_bytes(8, "little")
nonce_from_node = getattr(mp, "from").to_bytes(8, "little")
nonce = nonce_packet_id + nonce_from_node
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_bytes = decryptor.update(getattr(mp, "encrypted")) + decryptor.finalize()
data = mesh_pb2.Data()
data.ParseFromString(decrypted_bytes)
mp.decoded.CopyFrom(data)
except Exception as e:
print(f"*** Decryption failed: {str(e)}")
return
is_encrypted=True
if mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP:
text = mp.decoded.payload.decode("utf-8")
payload = { "text": text }
outs["type"] = "text"
outs["payload"] = payload
print(outs)
publish(client, config['broker']['out_topic'], json.dumps(outs))
elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP:
info = mesh_pb2.User()
info.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(info))
outs["type"] = "nodeinfo"
outs["payload"] = out
print(outs)
publish(client, config['broker']['out_topic'], json.dumps(outs))
elif mp.decoded.portnum == portnums_pb2.TRACEROUTE_APP:
out = json.loads(MessageToJson(mp.decoded))
outs["type"] = "traceroute"
outs["payload"] = out
print(outs)
publish(client, config['broker']['out_topic'], json.dumps(outs))
elif mp.decoded.portnum == portnums_pb2.POSITION_APP:
pos = mesh_pb2.Position()
pos.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(pos))
outs["type"] = "position"
outs["payload"] = out
print(outs)
publish(client, config['broker']['out_topic'], json.dumps(outs))
elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP:
env = telemetry_pb2.Telemetry()
env.ParseFromString(mp.decoded.payload)
out = json.loads(MessageToJson(env))
outs["type"] = "telemetry"
if 'deviceMetrics' in out:
outs["payload"] = out['deviceMetrics']
if 'environmentMetrics' in out:
outs["payload"] = out['environmentMetrics']
print(outs)
publish(client, config['broker']['out_topic'], json.dumps(outs))
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt(config['broker']['host'], config['broker']['port'], config['broker']['client_id'], config['broker']['username'], config['broker']['password'])
subscribe(client, config['broker']['in_topic'])
client.loop_forever()
if __name__ == "__main__":
run()
@quimnut
Copy link
Author

quimnut commented Jun 6, 2024

I want the unicode but want to use it in other apps like HA or node-red.
Looks like traceroute and mac addrs need more work.
And align json keys so it hot-plugs? with other apps like ha

Borrowed heavily from;
https://github.com/pdxlocations/Meshtastic-MQTT-Connect
https://github.com/kevinelliott/meshinfo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment