Skip to content

Instantly share code, notes, and snippets.

@mattronix
Last active August 17, 2023 22:26
Show Gist options
  • Save mattronix/93b73fc43ef52b5e31fe0b7236c719ae to your computer and use it in GitHub Desktop.
Save mattronix/93b73fc43ef52b5e31fe0b7236c719ae to your computer and use it in GitHub Desktop.
Converts Teltonika Modem HTTP Post GPS (NMEA) to COT ATAK
#!/usr/bin/env python3
"""
License: MIT License
Copyright (c) 2023 Matthew Frost
Very simple HTTP server in python for converting Teltonika GPS data to ATAK CoT
Usage::
./server.py [<port>]
"""
from http.server import BaseHTTPRequestHandler, HTTPServer
import logging
from pynmeagps import NMEAReader
from configparser import ConfigParser
import pytak
import socket
import uuid
import datetime
from xml.etree import ElementTree as ET
from xml.dom import minidom
from urllib.parse import urlparse, parse_qs
auth_list = [{"serial" : "1234567", "callsign":"TRUCK-01"}]
def check_serial_validity(serial_to_check, auth_list):
for auth_dict in auth_list:
if "serial" in auth_dict and auth_dict["serial"] == serial_to_check:
return auth_dict
return None # SERIAL not found in the auth_list
def generate_cot(uid, lat, lon, hae, callsign, type, time=None):
if time is None:
time = datetime.datetime.utcnow()
cot = ET.Element('event')
cot.set('version', '2.0')
cot.set('uid', str(uid))
#cot.set('type', 'a-f-G-U-C')
cot.set('type', 'a-u-G"')
usericon = ET.SubElement(cot, 'iconsetpath')
usericon.set('iconsetpath', 'ad78aafb-83a6-4c07-b2b9-a897a8b6a38f/SqrNum/03 Square Invert.png')
point = ET.SubElement(cot, 'point')
point.set('lat', str(lat))
point.set('lon', str(lon))
point.set('hae', str(hae))
detail = ET.SubElement(cot, 'detail')
contact = ET.SubElement(detail, 'contact')
contact.set('callsign', callsign)
contact.set('endpoint', 'sip:example@example.com')
contact.set('type', type)
time_element = ET.SubElement(cot, 'time')
time_element.set('stale', '300') # Stale time in seconds
time_element.text = time.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
rough_string = ET.tostring(cot, 'utf-8')
reparsed = minidom.parseString(rough_string)
return reparsed.toprettyxml(indent="\t")
def send_cot_to_atak_server(cot_message, server_address, server_port):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.sendto(cot_message.encode(), (server_address, server_port))
print("CoT message sent to ATAK server.")
class S(BaseHTTPRequestHandler):
def _set_response(self):
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
def do_POST(self):
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data
post_data = self.rfile.read(content_length) # <--- Gets the data itself
decoded_data = post_data.decode()
parsed_url = urlparse(self.path)
query_params = parse_qs(parsed_url.query)
gps_data = ""
if query_params:
print(query_params)
if not query_params["serial_num"]:
print("no serial_num number in request")
return
tracker = matching_object = check_serial_validity(query_params["serial_num"][0], auth_list)
if not tracker:
print("not on auth list")
return
for line in decoded_data.split('$'):
line = "$" + line
try:
nmea = NMEAReader.parse(line)
if not nmea.lat:
print("no lat data found")
continue
if not nmea.lon:
print("no lon data found")
continue
gps_data = nmea
# ATAK server address and port
server_address = 'localhost'
server_port = 8087
# Example CoT parameters
uid = str(tracker['serial'])
lat = gps_data.lat
lon = gps_data.lon
hae = 0.0
callsign = str(tracker['callsign'])
type = "a-f-G-U-C"
# Generate the CoT message
cot_message = generate_cot(uid, lat, lon, hae, callsign, type)
print(cot_message)
# Send the CoT message to the ATAK server using UDP
send_cot_to_atak_server(cot_message, server_address, server_port)
break
except Exception as E:
print(f"failed to parse nmea: {E}")
continue
def run(server_class=HTTPServer, handler_class=S, port=8080):
logging.basicConfig(level=logging.INFO)
server_address = ('', port)
httpd = server_class(server_address, handler_class)
logging.info('Starting httpd...\n')
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
logging.info('Stopping httpd...\n')
if __name__ == '__main__':
from sys import argv
if len(argv) == 2:
run(port=int(argv[1]))
else:
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment