Skip to content

Instantly share code, notes, and snippets.

@bogardpd
Created Dec 22, 2021
Embed
What would you like to do?
"""Updates canonical KML file and imports GPX if provided."""
import argparse
import gpxpy
import json
import os
import simplekml
import sys
import traceback
from datetime import timedelta, timezone
from dateutil.parser import parse, isoparse
from lxml import etree
# This script will generate both a KML file (to act as the canonical
# storage for driving data in a human readible format) and a KMZ file
# (to have a smaller filesize for loading over a network). The KML file
# will be read when merging new data.
CANONICAL_KML_FILE = "driving_canonical.kml"
OUTPUT_KMZ_FILE = "Driving.kmz"
IGNORE_FILE = "ignore.json"
with open(IGNORE_FILE, 'r') as f:
IGNORE = json.load(f)
MAX_TIME = timedelta(minutes=10) # Merge segments with small time gaps
MIN_POINTS = 1 # Only keep linestrings with at least this many points
TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%SZ"
NSMAP = {None: "http://www.opengis.net/kml/2.2"}
def main(args):
kml_dict = kml_to_dict(CANONICAL_KML_FILE)
if args.gpx_file is not None:
# GPX file was provided; parse and merge it.
gpx_dict = gpx_to_dict(args.gpx_file)
tracks_dict = merge_tracks(kml_dict, gpx_dict)
else:
# No GPX file was provided; just refresh canonical KML.
tracks_dict = kml_dict
export_kml(tracks_dict, CANONICAL_KML_FILE, False, False)
export_kml(tracks_dict, OUTPUT_KMZ_FILE, True, True)
def export_kml(kml_dict, output_file, zipped=False, merge_folder_tracks=False):
"""
Exports KML data to a KML (if zipped=False) or KMZ file located at
output_file. Will merge LineStrings within a Folder into a single
root-level LineString if merge_folder_tracks is set.
"""
filetype = "KMZ" if zipped else "KML"
print(f"Creating {filetype} file…")
kml = simplekml.Kml()
kml.document.name = "Driving" if zipped else "driving_canonical"
track_style = simplekml.Style()
track_style.linestyle.width = 4
if zipped:
track_style.linestyle.color = simplekml.Color.red
else:
track_style.linestyle.color = simplekml.Color.magenta
def dict_to_linestring(parent, timestamp, values):
line = parent.newlinestring(
name=timestamp.strftime(TIMESTAMP_FORMAT),
tessellate=1,
description=values['description'],
coords=values['coords']
)
line.style = track_style
line.timestamp = simplekml.TimeStamp(when=timestamp.isoformat())
for timestamp, values in sorted(kml_dict.items()):
if values.get('coords') is not None:
# This is a track; create a LineString.
dict_to_linestring(kml, timestamp, values)
else:
# This is a folder.
if merge_folder_tracks:
# Merge all folder tracks into a single LineString.
coords = [
track_coords
for ft, fv in sorted(values.items())
for track_coords in fv['coords']
]
# print(coords)
track_values = {'coords': coords, 'description': None}
dict_to_linestring(kml, timestamp, track_values)
else:
# Create a folder of LineStrings.
folder = kml.newfolder(
name=timestamp.strftime(TIMESTAMP_FORMAT)
)
for ftimestamp, fvalues in sorted(values.items()):
dict_to_linestring(folder, ftimestamp, fvalues)
if zipped:
kml.savekmz(output_file)
else:
kml.save(output_file)
print(f"Saved {filetype} to {output_file}!")
def gpx_to_dict(gpx_file):
"""
Reads the supplied GPX file and returns a dictionary with datetime
keys and descriptions/coordinates (lists of (lon,lat,ele) tuples) as
values. Any tracks or track segments in IGNORE_FILE will not be
included in the dictionary.
"""
print(f"Reading GPX from {gpx_file} …")
with open(gpx_file, 'r') as f:
gpx = gpxpy.parse(f)
ignore_trkseg = [isoparse(dt) for dt in IGNORE['trkseg']]
def filter_segments(segments):
"""Removes segments whose first point matches ignore list."""
try:
return [
seg for seg in segments
if seg.points[0].time not in ignore_trkseg
]
except AttributeError:
return segments
def merge_segments(segments, index=0):
""" Merges segments with small time gaps. """
segments = segments.copy()
if index + 1 == len(segments):
return segments
a,b = segments[index:index+2]
try:
timediff = b.points[0].time - a.points[-1].time
if timediff <= MAX_TIME:
a.points.extend(b.points)
del segments[index+1]
return merge_segments(segments, index=index)
else:
return merge_segments(segments, index=index+1)
except AttributeError:
return segments
track_dict = {}
for track in gpx.tracks:
print(f"Converting `{track.name}` …")
desc = track.description
track.segments = filter_segments(track.segments)
track.segments = merge_segments(track.segments)
for segment in track.segments:
try:
timestamp = segment.points[0].time.astimezone(timezone.utc)
except AttributeError:
timestamp = parse(track.name).astimezone(timezone.utc)
coords = list(
(p.longitude, p.latitude, p.elevation) for p in segment.points
)
if len(coords) >= MIN_POINTS:
track_dict[timestamp] = dict(coords=coords, description=desc)
return track_dict
def kml_to_dict(kml_file):
"""
Reads the supplied KML file and returns a dictionary with datetime
keys and descriptions/coordinates (lists of (lon,lat,ele) tuples) as
values. Can contain one level of subfolders as subdictionaries.
"""
print(f"Reading KML from {kml_file} …")
def placemarks_to_dict(node):
"""Parses all Placemark children of the node into a dict."""
output_dict = {}
for p in node.findall('Placemark', NSMAP):
timestamp = parse(p.find('TimeStamp/when', NSMAP).text)
timestamp = timestamp.astimezone(timezone.utc)
raw_coords = p.find('LineString/coordinates', NSMAP).text.strip()
coords = list(
tuple(
float(n) for n in c.split(",")
) for c in raw_coords.split(" ")
)
desc = p.find("description", NSMAP)
if desc is not None:
desc = desc.text.strip()
if len(coords) >= MIN_POINTS:
output_dict[timestamp] = dict(coords=coords, description=desc)
return output_dict
# The simplekml package does not parse KML files (it only creates
# them), so use lxml etree to parse the raw XML instead.
root = etree.parse(kml_file).getroot()
document = root.find('Document', NSMAP)
# Parse Document-level Placemarks.
track_dict = placemarks_to_dict(document)
# Parse Placemarks in Folders.
folders = document.findall('Folder', NSMAP)
for folder in folders:
folder_tracks = placemarks_to_dict(folder)
if len(folder_tracks) > 0:
track_dict[min(folder_tracks.keys())] = folder_tracks
return track_dict
def merge_tracks(existing_tracks, new_tracks):
"""
Merges a new track dict into an existing track dict. Tracks which
are already in existing tracks will not be overwritten by new
tracks.
"""
print("Merging new tracks into existing tracks …")
def flatten_keys(tracks):
keys = set()
for k, v in tracks.items():
if v.get('coords') is not None:
keys.add(k)
else:
for subk in v.keys():
keys.add(subk)
return keys
existing_keys = flatten_keys(existing_tracks)
new_keys = set(new_tracks.keys())
keys_to_merge = new_keys - existing_keys
tracks_to_merge = {
k:v
for k, v in new_tracks.items()
if k in keys_to_merge
}
# Merge existing tracks with gpx tracks. Existing tracks should
# override new tracks with same timestamp.
merged = {**tracks_to_merge, **existing_tracks}
# Filter out tracks with times matching ignore list.
ignore_trk = [isoparse(dt) for dt in IGNORE['trk']]
merged = {k:v for k,v in merged.items() if k not in ignore_trk}
print(f"{len(new_keys)} imported tracks")
print(f"{len(existing_keys)} existing tracks")
print(f"{len(flatten_keys(merged))} merged tracks")
return merged
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Update KML and import GPX")
parser.add_argument(
dest='gpx_file',
nargs='?',
default=None,
help="GPX file to import"
)
args = parser.parse_args()
try:
main(args)
except BaseException:
print(sys.exc_info()[0])
print(traceback.format_exc())
finally:
os.system("pause")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment