-
-
Save quimnut/c5ae707c627ed5434b300f6e8a40ccf7 to your computer and use it in GitHub Desktop.
meshtastic mqtt decoder, for unicode.,
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import paho.mqtt.client as mqtt_client | |
from meshtastic import mesh_pb2, mqtt_pb2, portnums_pb2, telemetry_pb2, BROADCAST_NUM | |
import base64 | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.backends import default_backend | |
from google.protobuf.json_format import MessageToJson | |
import json | |
config = { | |
'broker': { | |
'host': '10.69.69.69', | |
'port': 1883, | |
'client_id': 'meshinfo', | |
'in_topic': 'msh/ANZ/2/e/#', | |
'out_topic': 'msh/decoded/ANZ', | |
'username': 'username', | |
'password': 'password', | |
}, | |
} | |
key = "AQ==" | |
key_hash = "1PG7OiApB1nwvP+rz05pAQ==" # AQ== | |
def connect_mqtt(broker, port, client_id, username, password): | |
def on_connect(client, userdata, flags, rc, properties=None): | |
global mqtt_connect_time | |
if rc == 0: | |
print("Connected to MQTT broker") | |
else: | |
print("Failed to connect, error: %s\n" % rc) | |
client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION2) | |
client.on_connect = on_connect | |
# client.username_pw_set(username, password) | |
client.connect(broker, port) | |
return client | |
def publish(client, topic, msg): | |
result = client.publish(topic, msg) | |
status = result[0] | |
if status == 0: | |
print(f"Send `{msg}` to topic `{topic}`") | |
print("Done!") | |
return True | |
else: | |
print(f"Failed to send message to topic {topic}") | |
return False | |
def subscribe(client, topic): | |
def on_message(client, userdata, msg, properties=None): | |
try: | |
se = mqtt_pb2.ServiceEnvelope() | |
is_encrypted = False | |
se.ParseFromString(msg.payload) | |
mp = se.packet | |
outs = json.loads(MessageToJson(mp)) | |
except Exception as e: | |
print(f"*** ParseFromString: {str(e)}") | |
if mp.HasField("encrypted") and not mp.HasField("decoded"): | |
try: | |
key_bytes = base64.b64decode(key_hash.encode('ascii')) | |
nonce_packet_id = getattr(mp, "id").to_bytes(8, "little") | |
nonce_from_node = getattr(mp, "from").to_bytes(8, "little") | |
nonce = nonce_packet_id + nonce_from_node | |
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend()) | |
decryptor = cipher.decryptor() | |
decrypted_bytes = decryptor.update(getattr(mp, "encrypted")) + decryptor.finalize() | |
data = mesh_pb2.Data() | |
data.ParseFromString(decrypted_bytes) | |
mp.decoded.CopyFrom(data) | |
except Exception as e: | |
print(f"*** Decryption failed: {str(e)}") | |
return | |
is_encrypted=True | |
if mp.decoded.portnum == portnums_pb2.TEXT_MESSAGE_APP: | |
text = mp.decoded.payload.decode("utf-8") | |
payload = { "text": text } | |
outs["type"] = "text" | |
outs["payload"] = payload | |
print(outs) | |
publish(client, config['broker']['out_topic'], json.dumps(outs)) | |
elif mp.decoded.portnum == portnums_pb2.NODEINFO_APP: | |
info = mesh_pb2.User() | |
info.ParseFromString(mp.decoded.payload) | |
out = json.loads(MessageToJson(info)) | |
outs["type"] = "nodeinfo" | |
outs["payload"] = out | |
print(outs) | |
publish(client, config['broker']['out_topic'], json.dumps(outs)) | |
elif mp.decoded.portnum == portnums_pb2.TRACEROUTE_APP: | |
out = json.loads(MessageToJson(mp.decoded)) | |
outs["type"] = "traceroute" | |
outs["payload"] = out | |
print(outs) | |
publish(client, config['broker']['out_topic'], json.dumps(outs)) | |
elif mp.decoded.portnum == portnums_pb2.POSITION_APP: | |
pos = mesh_pb2.Position() | |
pos.ParseFromString(mp.decoded.payload) | |
out = json.loads(MessageToJson(pos)) | |
outs["type"] = "position" | |
outs["payload"] = out | |
print(outs) | |
publish(client, config['broker']['out_topic'], json.dumps(outs)) | |
elif mp.decoded.portnum == portnums_pb2.TELEMETRY_APP: | |
env = telemetry_pb2.Telemetry() | |
env.ParseFromString(mp.decoded.payload) | |
out = json.loads(MessageToJson(env)) | |
outs["type"] = "telemetry" | |
if 'deviceMetrics' in out: | |
outs["payload"] = out['deviceMetrics'] | |
if 'environmentMetrics' in out: | |
outs["payload"] = out['environmentMetrics'] | |
print(outs) | |
publish(client, config['broker']['out_topic'], json.dumps(outs)) | |
client.subscribe(topic) | |
client.on_message = on_message | |
def run(): | |
client = connect_mqtt(config['broker']['host'], config['broker']['port'], config['broker']['client_id'], config['broker']['username'], config['broker']['password']) | |
subscribe(client, config['broker']['in_topic']) | |
client.loop_forever() | |
if __name__ == "__main__": | |
run() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I want the unicode but want to use it in other apps like HA or node-red.
Looks like traceroute and mac addrs need more work.
And align json keys so it hot-plugs? with other apps like ha
Borrowed heavily from;
https://github.com/pdxlocations/Meshtastic-MQTT-Connect
https://github.com/kevinelliott/meshinfo