Skip to content

Instantly share code, notes, and snippets.

@tijszwinkels
Created March 4, 2024 22:18
Show Gist options
  • Save tijszwinkels/dbc1901274187e71119a0f0d907db743 to your computer and use it in GitHub Desktop.
Save tijszwinkels/dbc1901274187e71119a0f0d907db743 to your computer and use it in GitHub Desktop.
teslamapillary.py
#!/usr/bin/python3
# Rewrite of the tesla2mapillary.py script
#
# Differences:
# - Processes in cwd, leaves the original file alone
# - Deletes tmp-data after uploading, requires much less disk-space
# - Allows to specify video-files using wildcards
import os
import sys
import argparse
import shutil
import subprocess
from datetime import datetime, timedelta, timezone
import pytz
TOOLS = 'mapillary_tools' #mapillary tools command (modify with path, version etc. if neeeded)
def process_video_file(video_file_path, gpx_file, mapillary_username, localtz):
if not video_file_path.endswith('.mp4'):
print('Skipping non-mp4 file: ' + video_file_path)
return
video_file_name = os.path.basename(video_file_path)
# Check whether file has already been processed
gpx_file_hash = hash(str(gpx_file))
exists_filename = video_file_name + '-' + str(gpx_file_hash) + '.done'
exists_path = os.path.join(os.getcwd(), exists_filename)
if os.path.exists(exists_path):
print('Skipping already processed file: ' + video_file_name)
return
# measure time
start_time = datetime.now()
# get the date from the video file name
print("Processing: " + video_file_name)
tesla_datetime = datetime.strptime(video_file_name[0:19], "%Y-%m-%d_%H-%M-%S") #Pattern: 2020-09-24_14-06-34-front.mp4
print(video_file_name[0:19])
mytz = pytz.timezone(localtz)
tesla_datetime = mytz.localize(tesla_datetime)
milliseconds_since_epoch = tesla_datetime.timestamp()*1000
# Determine the camera angle
angle = 0
if 'back' in video_file_path:
angle = 180
elif 'left_repeater' in video_file_path:
angle = -120
elif 'right_repeater' in video_file_path:
angle = 120
# Copy video_file to the current directory
# get ./tmp
tmp_dir = os.path.join(os.getcwd(), 'tmp-' + video_file_name)
if not os.path.isdir(tmp_dir):
os.mkdir(tmp_dir)
# copy video_file to ./tmp
copy_start_time = datetime.now()
dst = os.path.join(tmp_dir, video_file_name)
shutil.copyfile(video_file_path, dst)
print("Copy took: " + str(datetime.now() - copy_start_time))
# Call mapillary tools to split into frames, geotag, and upload
video_start_time = datetime.utcfromtimestamp(milliseconds_since_epoch / 1000)
formatted_video_start_time = video_start_time.strftime("%Y_%m_%d_%H_%M_%S_%f")[:-3]
print(formatted_video_start_time)
print("Sampling video...")
# 12 fps. We skip frames anyway if we've moved less than 5m.
subprocess.run([TOOLS, "sample_video", "--video_sample_distance", "-1", "--video_sample_interval", "0.083333", "--video_start_time", formatted_video_start_time, tmp_dir])
print("process_and_upload ...")
subprocess.run([TOOLS, "process_and_upload", "--skip_process_errors", "--user_name", mapillary_username , "--geotag_source", "gpx", "--geotag_source_path", gpx_file, "--overwrite_all_EXIF_tags", "--offset_angle", str(angle), "--device_make", "Tesla", tmp_dir])
# remove tmp directory
remove_start_time = datetime.now()
shutil.rmtree(tmp_dir)
print("Remove took: " + str(datetime.now() - remove_start_time))
# Create a file to mark the file as processed
with open(exists_path, 'w') as f:
f.write('done')
# measure duration
end_time = datetime.now()
print("Processing " + video_file_name + " took: " + str(end_time - start_time))
def main():
# PYTHONHASHSEED magic to make hash consistent between runs
# See: https://stackoverflow.com/questions/30585108/disable-hash-randomization-from-within-python-program/64138050#64138050
hashseed = os.getenv('PYTHONHASHSEED')
if not hashseed:
os.environ['PYTHONHASHSEED'] = '42'
os.execv(sys.executable, [sys.executable] + sys.argv)
# Parse arguments
my_parser = argparse.ArgumentParser(description='Process Tesla dashcam videos with accompanying GPX file')
my_parser.add_argument('mapillary_user', metavar='mapillary_user', type=str, help='Mapillary username')
my_parser.add_argument('timezone', metavar='timezone', type=str, default="Europe/Zurich", help='Timezone the Tesla ride was in. Default: Europe/Zurich')
my_parser.add_argument('gpxfilepath',metavar='gpxpath', type=str, help='The gpx file')
my_parser.add_argument('videofiles', metavar='videofiles', type=str, nargs='+', help='The videofiles to process')
args = my_parser.parse_args()
gpxpath = args.gpxfilepath
mapillary_user = args.mapillary_user
timezone = args.timezone
for file in args.videofiles:
process_video_file(file, args.gpxfilepath, args.mapillary_user, args.timezone)
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment