Skip to content

Instantly share code, notes, and snippets.

@JF002
Created December 9, 2023 09:10
Show Gist options
  • Save JF002/0268e82d6200e390b2808fb816f5b1d7 to your computer and use it in GitHub Desktop.
Save JF002/0268e82d6200e390b2808fb816f5b1d7 to your computer and use it in GitHub Desktop.
Convert GPS data points from Seeedstudio LoRaWAN tracker from TTN MQTT to GeoJSON
import json
from datetime import datetime
# https://jsoneditoronline.org/#left=local.kuvewa
# https://files.seeedstudio.com/products/SenseCAP/SenseCAP_Tracker/SenseCAP_Tracker_T1000-AB_User_Guide.pdf
# https://github.com/Seeed-Solution/SenseCAP-Decoder/blob/main/T1000/TTN/SenseCAP_T1000_TTN_Decoder.js
file = open("data.log", "r")
lines = file.readlines()
count = 0
positions=[]
for line in lines:
try:
obj = json.loads(line)
if obj["end_device_ids"]["device_id"] == "tracker-t1000a-1":
uplink_message = obj["uplink_message"]
rx_metadata = uplink_message["rx_metadata"][0]
received_at = rx_metadata["received_at"]
uplink_message_settings_time = uplink_message["settings"]["time"]
uplink_message_received_at = uplink_message["received_at"]
rx_metadata_time = rx_metadata["time"]
rx_metadata_gps_time = rx_metadata["gps_time"]
rx_metadata_received_at = rx_metadata["received_at"]
rx_metadata_timestamp = rx_metadata["timestamp"]
rssi = str(rx_metadata["rssi"])
snr = str(rx_metadata["rssi"])
timestamp = 0
try:
timestamp = obj["uplink_message"]["decoded_payload"]["messages"][0][0]["timestamp"]
except:
pass
decoded_payload = obj["uplink_message"]["decoded_payload"]
messages = []
if decoded_payload.get("messages") != None:
messages = decoded_payload["messages"]
error_message = ""
if decoded_payload.get("errMessage") != None:
error_message = decoded_payload["errMessage"]
if len(messages) > 0 or len(error_message) > 0:
print("[" + str(count) + "]" + received_at + " --> " + datetime.fromtimestamp(timestamp / 1000).strftime(
'%c') + " -- RSSI: " + rssi + ", SNR = " + snr)
count = count + 1
messageCount = 0
for message in messages:
latitude = None
longitude = None
for measurement in message:
print("\t[" + str(messageCount) + "] " + measurement["measurementId"] + " = " + str(measurement["measurementValue"]))
if measurement["measurementId"] == "4197":
longitude = measurement["measurementValue"]
if measurement["measurementId"] == "4198":
latitude = measurement["measurementValue"]
if latitude is not None and longitude is not None:
positions.append({"timestamp": timestamp, "longitude": longitude, "latitude": latitude})
messageCount = messageCount + 1
if len(error_message) > 0:
print("\tError : " + error_message)
except:
pass
filePositions = open("positions.txt", "w")
filePositions.write("{\n \"type\": \"FeatureCollection\",\n\t\"features\": [\n\t\t{\n")
filePositions.write("\"type\": \"Feature\",\n\"geometry\": {\n\"type\": \"MultiLineString\",\n\"coordinates\": [\n[\n")
print("timestamp;long;lat")
#filePositions.write("timestamp;long;lat\n")
c = 0
positions_sorted = sorted(positions, key=lambda p: p["timestamp"])
for position in positions_sorted:
#filePositions.write(datetime.fromtimestamp(position["timestamp"] / 1000).strftime('%c') + ";" + str(position["longitude"]) + ";" + str(position["latitude"]) + "\n")
if c < len(positions) - 1:
filePositions.write("[" + str(position["longitude"]) + "," + str(position["latitude"]) + "],\n")
else:
filePositions.write("[" + str(position["longitude"]) + "," + str(position["latitude"]) + "]\n")
print(datetime.fromtimestamp(position["timestamp"] / 1000).strftime('%c') + ";" + str(position["longitude"]) + ";" + str(position["latitude"]))
c = c+1
filePositions.write("]]}}]}")
filePositions.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment