Created
March 4, 2024 22:18
-
-
Save tijszwinkels/dbc1901274187e71119a0f0d907db743 to your computer and use it in GitHub Desktop.
teslamapillary.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
# Rewrite of the tesla2mapillary.py script | |
# | |
# Differences: | |
# - Processes in cwd, leaves the original file alone | |
# - Deletes tmp-data after uploading, requires much less disk-space | |
# - Allows to specify video-files using wildcards | |
import os | |
import sys | |
import argparse | |
import shutil | |
import subprocess | |
from datetime import datetime, timedelta, timezone | |
import pytz | |
TOOLS = 'mapillary_tools' #mapillary tools command (modify with path, version etc. if neeeded) | |
def process_video_file(video_file_path, gpx_file, mapillary_username, localtz): | |
if not video_file_path.endswith('.mp4'): | |
print('Skipping non-mp4 file: ' + video_file_path) | |
return | |
video_file_name = os.path.basename(video_file_path) | |
# Check whether file has already been processed | |
gpx_file_hash = hash(str(gpx_file)) | |
exists_filename = video_file_name + '-' + str(gpx_file_hash) + '.done' | |
exists_path = os.path.join(os.getcwd(), exists_filename) | |
if os.path.exists(exists_path): | |
print('Skipping already processed file: ' + video_file_name) | |
return | |
# measure time | |
start_time = datetime.now() | |
# get the date from the video file name | |
print("Processing: " + video_file_name) | |
tesla_datetime = datetime.strptime(video_file_name[0:19], "%Y-%m-%d_%H-%M-%S") #Pattern: 2020-09-24_14-06-34-front.mp4 | |
print(video_file_name[0:19]) | |
mytz = pytz.timezone(localtz) | |
tesla_datetime = mytz.localize(tesla_datetime) | |
milliseconds_since_epoch = tesla_datetime.timestamp()*1000 | |
# Determine the camera angle | |
angle = 0 | |
if 'back' in video_file_path: | |
angle = 180 | |
elif 'left_repeater' in video_file_path: | |
angle = -120 | |
elif 'right_repeater' in video_file_path: | |
angle = 120 | |
# Copy video_file to the current directory | |
# get ./tmp | |
tmp_dir = os.path.join(os.getcwd(), 'tmp-' + video_file_name) | |
if not os.path.isdir(tmp_dir): | |
os.mkdir(tmp_dir) | |
# copy video_file to ./tmp | |
copy_start_time = datetime.now() | |
dst = os.path.join(tmp_dir, video_file_name) | |
shutil.copyfile(video_file_path, dst) | |
print("Copy took: " + str(datetime.now() - copy_start_time)) | |
# Call mapillary tools to split into frames, geotag, and upload | |
video_start_time = datetime.utcfromtimestamp(milliseconds_since_epoch / 1000) | |
formatted_video_start_time = video_start_time.strftime("%Y_%m_%d_%H_%M_%S_%f")[:-3] | |
print(formatted_video_start_time) | |
print("Sampling video...") | |
# 12 fps. We skip frames anyway if we've moved less than 5m. | |
subprocess.run([TOOLS, "sample_video", "--video_sample_distance", "-1", "--video_sample_interval", "0.083333", "--video_start_time", formatted_video_start_time, tmp_dir]) | |
print("process_and_upload ...") | |
subprocess.run([TOOLS, "process_and_upload", "--skip_process_errors", "--user_name", mapillary_username , "--geotag_source", "gpx", "--geotag_source_path", gpx_file, "--overwrite_all_EXIF_tags", "--offset_angle", str(angle), "--device_make", "Tesla", tmp_dir]) | |
# remove tmp directory | |
remove_start_time = datetime.now() | |
shutil.rmtree(tmp_dir) | |
print("Remove took: " + str(datetime.now() - remove_start_time)) | |
# Create a file to mark the file as processed | |
with open(exists_path, 'w') as f: | |
f.write('done') | |
# measure duration | |
end_time = datetime.now() | |
print("Processing " + video_file_name + " took: " + str(end_time - start_time)) | |
def main(): | |
# PYTHONHASHSEED magic to make hash consistent between runs | |
# See: https://stackoverflow.com/questions/30585108/disable-hash-randomization-from-within-python-program/64138050#64138050 | |
hashseed = os.getenv('PYTHONHASHSEED') | |
if not hashseed: | |
os.environ['PYTHONHASHSEED'] = '42' | |
os.execv(sys.executable, [sys.executable] + sys.argv) | |
# Parse arguments | |
my_parser = argparse.ArgumentParser(description='Process Tesla dashcam videos with accompanying GPX file') | |
my_parser.add_argument('mapillary_user', metavar='mapillary_user', type=str, help='Mapillary username') | |
my_parser.add_argument('timezone', metavar='timezone', type=str, default="Europe/Zurich", help='Timezone the Tesla ride was in. Default: Europe/Zurich') | |
my_parser.add_argument('gpxfilepath',metavar='gpxpath', type=str, help='The gpx file') | |
my_parser.add_argument('videofiles', metavar='videofiles', type=str, nargs='+', help='The videofiles to process') | |
args = my_parser.parse_args() | |
gpxpath = args.gpxfilepath | |
mapillary_user = args.mapillary_user | |
timezone = args.timezone | |
for file in args.videofiles: | |
process_video_file(file, args.gpxfilepath, args.mapillary_user, args.timezone) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment