Skip to content

Instantly share code, notes, and snippets.

@Safrone Safrone/mqtt_join_processor.py Secret
Created Feb 19, 2020

Embed
What would you like to do?
Processes join uplinks from mqtt and publishes join request/accept/reject/failure to stats/<dev_eui>/join
#!/usr/bin/env python3
import base64
import os
import uuid
from time import sleep
import re
import datetime
import json
import paho.mqtt.client as mqtt
# uplink = {
# "phyPayload":"AL3GraPODsb5iaAAAArKJQDuKVROe5Q=",
# "txInfo":{"frequency":904300000,"modulation":"LORA",
# "loRaModulationInfo":{"bandwidth":125,"spreadingFactor":10,"codeRate":"4/5","polarizationInversion":False}},
# "rxInfo":{"gatewayID":"wO5A//8pQ7w=","time":None,"timeSinceGPSEpoch":None,"rssi":-48,
# "loRaSNR":7,"channel":2,"rfChain":0,"board":0,"antenna":0,"location":None,
# "fineTimestampType":"NONE","context":"HLm7XA==","uplinkID":"voTVw3y3SYyeLTjnNkS7Ow==","crcStatus":"CRC_OK"}
# }
def process_gateway_uplink(uplink: dict):
byte_array = base64.b64decode(uplink['phyPayload'])
mhdr, mac_payload, mic = byte_array[0], byte_array[1:-4], byte_array[-4:]
output = []
dev_eui = None
# Join Request
if mhdr == 0x00:
app_eui, dev_eui, dev_nonce = (bytes(reversed(mac_payload[:8])).hex(),
bytes(reversed(mac_payload[8:16])).hex(),
mac_payload[16:].hex())
gateway_eui = base64.b64decode(uplink['rxInfo']['gatewayID']).hex()
uplink_id = str(uuid.UUID(bytes=base64.b64decode(uplink['rxInfo']['uplinkID'])))
gateway_tags = "gateway=%s" % gateway_eui.lower()
# wait a little to ensure join has been processed
sleep(3)
context_id = None
with open('/var/log/syslog', 'r') as f:
for line in f:
if uplink_id in line:
print(line)
if 'ctx_id=' in line and 'mtype=JoinRequest' in line:
# Feb 18 15:04:22 LoRaServer-production chirpstack-network-server[16404]: time="2020-02-18T15:04:22Z" level=info msg="uplink: frame(s) collected" ctx_id=513229da-fc64-4ab7-94b7-b8820edd648b mtype=JoinRequest uplink_ids="[6a61ae7f-59f5-4abb-a2f8-d783446c7f5f]"
month, day, hms, hostname, thread, time, level, msg, ctx_id, mtype, uplink_ids = re.findall(r'(?:[^\s,"]|"(?:\\.|[^"])*")+', line)
time = datetime.datetime.strptime(time.split('"')[1].replace('Z', '+0000'), "%Y-%m-%dT%H:%M:%S%z")
timestamp = "%d000000000" % int(time.timestamp())
out = "join_stats,dev_eui=%s,%s join_request=1i %s" % (dev_eui.lower(), gateway_tags, timestamp)
output.append(out)
context_id = ctx_id.split('ctx_id=')[1].strip()
elif context_id and context_id in line:
if 'get device error: object does not exist' in line:
# Device doesn't exist
# Feb 18 15:04:22 LoRaServer-production chirpstack-network-server[16404]: time="2020-02-18T15:04:22Z" level=error msg="uplink: processing uplink frame error" ctx_id=513229da-fc64-4ab7-94b7-b8820edd648b error="get device error: object does not exist"
month, day, hms, hostname, thread, time, level, msg, ctx_id, error = re.findall(r'(?:[^\s,"]|"(?:\\.|[^"])*")+', line)
time = datetime.datetime.strptime(time.split('"')[1].replace('Z', '+0000'), "%Y-%m-%dT%H:%M:%S%z")
timestamp = "%d000000000" % int(time.timestamp())
out = "join_stats,dev_eui=%s,%s join_fail_no_device=1i %s" % (dev_eui.lower(), gateway_tags, timestamp)
output.append(out)
break
if 'sent uplink meta-data to network-controller' in line:
# Join accepted
# Feb 18 15:15:42 LoRaServer-production chirpstack-network-server[16404]: time="2020-02-18T15:15:42Z" level=info msg="sent uplink meta-data to network-controller" ctx_id=8db284a4-52d6-40ec-8ed3-fc93c7ab4818 dev_eui=0025ca0a0000866c
month, day, hms, hostname, thread, time, level, msg, ctx_id, device_eui = re.findall(r'(?:[^\s,"]|"(?:\\.|[^"])*")+', line)
time = datetime.datetime.strptime(time.split('"')[1].replace('Z', '+0000'), "%Y-%m-%dT%H:%M:%S%z")
timestamp = "%d000000000" % int(time.timestamp())
out = "join_stats,dev_eui=%s,%s join_accept=1i %s" % (dev_eui.lower(), gateway_tags, timestamp)
output.append(out)
break
if 'join-request to join-server error: response error, code: MICFailed, description: invalid mic' in line:
# app_key failed
# Feb 19 18:35:07 LoRaServer-production chirpstack-network-server[16404]: time="2020-02-19T18:35:07Z" level=error msg="uplink: processing uplink frame error" ctx_id=0e2d64a9-4f82-4b9e-849c-acbeb6988468 error="join-request to join-server error: response error, code: MICFailed, description: invalid mic"
month, day, hms, hostname, thread, time, level, msg, ctx_id, error = re.findall(r'(?:[^\s,"]|"(?:\\.|[^"])*")+', line)
time = datetime.datetime.strptime(time.split('"')[1].replace('Z', '+0000'), "%Y-%m-%dT%H:%M:%S%z")
timestamp = "%d000000000" % int(time.timestamp())
out = "join_stats,dev_eui=%s,%s join_fail_app_key=1i %s" % (dev_eui.lower(), gateway_tags, timestamp)
output.append(out)
break
return dev_eui, output
def on_connect(client, userdata, flags, rc):
print("Connected with result code %s" % rc)
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe("gateway/+/event/up")
def on_message(client: mqtt.Client, userdata, msg):
uplink = json.loads(msg.payload.decode())
device_eui, publish = process_gateway_uplink(uplink)
if device_eui:
for line in publish:
client.publish("stats/%s/join" % device_eui, line)
if __name__ == '__main__':
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(os.getenv('MQTT_USER'), os.getenv('MQTT_PASSWORD'))
client.connect("0.0.0.0", 1883, 60)
client.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.