Last active
August 17, 2023 22:26
-
-
Save mattronix/93b73fc43ef52b5e31fe0b7236c719ae to your computer and use it in GitHub Desktop.
Converts Teltonika Modem HTTP Post GPS (NMEA) to COT ATAK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
License: MIT License | |
Copyright (c) 2023 Matthew Frost | |
Very simple HTTP server in python for converting Teltonika GPS data to ATAK CoT | |
Usage:: | |
./server.py [<port>] | |
""" | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
import logging | |
from pynmeagps import NMEAReader | |
from configparser import ConfigParser | |
import pytak | |
import socket | |
import uuid | |
import datetime | |
from xml.etree import ElementTree as ET | |
from xml.dom import minidom | |
from urllib.parse import urlparse, parse_qs | |
auth_list = [{"serial" : "1234567", "callsign":"TRUCK-01"}] | |
def check_serial_validity(serial_to_check, auth_list): | |
for auth_dict in auth_list: | |
if "serial" in auth_dict and auth_dict["serial"] == serial_to_check: | |
return auth_dict | |
return None # SERIAL not found in the auth_list | |
def generate_cot(uid, lat, lon, hae, callsign, type, time=None): | |
if time is None: | |
time = datetime.datetime.utcnow() | |
cot = ET.Element('event') | |
cot.set('version', '2.0') | |
cot.set('uid', str(uid)) | |
#cot.set('type', 'a-f-G-U-C') | |
cot.set('type', 'a-u-G"') | |
usericon = ET.SubElement(cot, 'iconsetpath') | |
usericon.set('iconsetpath', 'ad78aafb-83a6-4c07-b2b9-a897a8b6a38f/SqrNum/03 Square Invert.png') | |
point = ET.SubElement(cot, 'point') | |
point.set('lat', str(lat)) | |
point.set('lon', str(lon)) | |
point.set('hae', str(hae)) | |
detail = ET.SubElement(cot, 'detail') | |
contact = ET.SubElement(detail, 'contact') | |
contact.set('callsign', callsign) | |
contact.set('endpoint', 'sip:example@example.com') | |
contact.set('type', type) | |
time_element = ET.SubElement(cot, 'time') | |
time_element.set('stale', '300') # Stale time in seconds | |
time_element.text = time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') | |
rough_string = ET.tostring(cot, 'utf-8') | |
reparsed = minidom.parseString(rough_string) | |
return reparsed.toprettyxml(indent="\t") | |
def send_cot_to_atak_server(cot_message, server_address, server_port): | |
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | |
sock.sendto(cot_message.encode(), (server_address, server_port)) | |
print("CoT message sent to ATAK server.") | |
class S(BaseHTTPRequestHandler): | |
def _set_response(self): | |
self.send_response(200) | |
self.send_header('Content-type', 'text/html') | |
self.end_headers() | |
def do_POST(self): | |
content_length = int(self.headers['Content-Length']) # <--- Gets the size of data | |
post_data = self.rfile.read(content_length) # <--- Gets the data itself | |
decoded_data = post_data.decode() | |
parsed_url = urlparse(self.path) | |
query_params = parse_qs(parsed_url.query) | |
gps_data = "" | |
if query_params: | |
print(query_params) | |
if not query_params["serial_num"]: | |
print("no serial_num number in request") | |
return | |
tracker = matching_object = check_serial_validity(query_params["serial_num"][0], auth_list) | |
if not tracker: | |
print("not on auth list") | |
return | |
for line in decoded_data.split('$'): | |
line = "$" + line | |
try: | |
nmea = NMEAReader.parse(line) | |
if not nmea.lat: | |
print("no lat data found") | |
continue | |
if not nmea.lon: | |
print("no lon data found") | |
continue | |
gps_data = nmea | |
# ATAK server address and port | |
server_address = 'localhost' | |
server_port = 8087 | |
# Example CoT parameters | |
uid = str(tracker['serial']) | |
lat = gps_data.lat | |
lon = gps_data.lon | |
hae = 0.0 | |
callsign = str(tracker['callsign']) | |
type = "a-f-G-U-C" | |
# Generate the CoT message | |
cot_message = generate_cot(uid, lat, lon, hae, callsign, type) | |
print(cot_message) | |
# Send the CoT message to the ATAK server using UDP | |
send_cot_to_atak_server(cot_message, server_address, server_port) | |
break | |
except Exception as E: | |
print(f"failed to parse nmea: {E}") | |
continue | |
def run(server_class=HTTPServer, handler_class=S, port=8080): | |
logging.basicConfig(level=logging.INFO) | |
server_address = ('', port) | |
httpd = server_class(server_address, handler_class) | |
logging.info('Starting httpd...\n') | |
try: | |
httpd.serve_forever() | |
except KeyboardInterrupt: | |
pass | |
httpd.server_close() | |
logging.info('Stopping httpd...\n') | |
if __name__ == '__main__': | |
from sys import argv | |
if len(argv) == 2: | |
run(port=int(argv[1])) | |
else: | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment