Skip to content

Instantly share code, notes, and snippets.

@petered
Last active June 26, 2022 18:33
Show Gist options
  • Save petered/da8ba1018a79e795119baaacc514da4d to your computer and use it in GitHub Desktop.
Save petered/da8ba1018a79e795119baaacc514da4d to your computer and use it in GitHub Desktop.
Syncs data from a DJI SD Card to your local file system, renaming files using modified time
import os
import shutil
import sys
from argparse import ArgumentParser
from datetime import datetime
from typing import Optional, Sequence
def modified_timestamp_to_filename(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp).strftime('dji_%Y-%m-%d_%H-%M-%S')
def get_dest_filepath(src_path: str, src_root_dir: str, dest_root_dir: str) -> str:
src_root_dir = src_root_dir.rstrip(os.sep) + os.sep
assert src_path.startswith(src_root_dir), f"File {src_path} was not in root dir {src_root_dir}"
src_rel_folder, src_filename = os.path.split(src_path[len(src_root_dir):])
src_name, ext = os.path.splitext(src_filename)
src_order_number = src_name.split('_', 1)[1] # 'DJI_0215' -> '0215'
timestamp = os.path.getmtime(src_path)
new_filename = f'{modified_timestamp_to_filename(timestamp)}_{src_order_number}{ext.lower()}'
return os.path.join(dest_root_dir, src_rel_folder, new_filename)
def iter_filepaths_in_directory_recursive(directory, allowed_extensions: Optional[Sequence[str]]):
allowed_extensions = tuple(e.lower() for e in allowed_extensions)
yield from (os.path.join(dp, f) for dp, dn, filenames in os.walk(directory)
for f in filenames if allowed_extensions is None or any(f.lower().endswith(e) for e in allowed_extensions))
def copy_creating_dir_if_needed(src_path: str, dest_path: str):
parent, _ = os.path.split(dest_path)
if not os.path.exists(parent):
os.makedirs(parent)
shutil.copyfile(src_path, dest_path)
def get_recursive_directory_contents_string(directory: str, indent_level=0, indent=' ', max_entries: Optional[int] = None) -> str:
lines = []
this_indent = indent * indent_level
for i, f in enumerate(os.listdir(directory)):
if max_entries is not None and i >= max_entries:
lines.append(this_indent + '...')
break
lines.append(this_indent + f)
fpath = os.path.join(directory, f)
if os.path.isdir(fpath):
lines.append(get_recursive_directory_contents_string(fpath, indent_level=indent_level + 1, max_entries=max_entries))
return '\n'.join(lines)
def sync_data_from_dji_sd_card(
drone_sd_card_dcim_folder='/Volumes/Untitled/DCIM', # DCIM folder on SD card (this path worked for me on Mac)
media_subfolder_name='100MEDIA', # This subfolder should exist if you have any recorded data on drone
pano_subfolder_name='PANORAMA', # This subfolder may exist if you've taken panos
destination_folder='~/dji_data', # Place to put the data (does not need to exist yet)
extensions_to_copy=('.mp4', '.jpg'), # File types to copy (here to avoid copying html files, etc)
overwrite: bool = False, # Overwrite existing files on machine
check_byte_sizes=True, # Check that, for existing files, file-size matches source. If not, overwrite.
include_pano_source_images=True, # Copy panorama source images too (note - does not copy generated panos, as those only exist on phone)
verbose: bool = True # Prints a lot.
):
"""
Syncs data from DJI SD card to a local destination folder, renaming files using modified date (GMT).
Does not re-copy files that already exist at the destination (unless override is selected or file-size does not match)
This is useful because DJI's original file names are just sequence numbers like DJI_0214 - which will reset once card is reformatted.
Files and folders are renamed using the modified date and original sequence number on the drone. E.g.
/Volumes/Untitled/DCIM/100MEDIA/DJI_0214.JPG -> /Users/peter/dji_data/dji_2022-06-15_20-50-18_0214.jpg
/Volumes/Untitled/DCIM/100MEDIA/DJI_0215.MP4 -> /Users/peter/dji_data/dji_2022-06-16_12-36-51_0215.mp4
^ Sequence Number ^ Date ^ Time ^ Sequence Number
Panorama source files (not - not the panos themselves, which only exist on the phone) will bave their folders renamed, e.g.
/Volumes/Untitled/DCIM/PANORAMA/100_0050/DJI_0015.JPG -> /Users/peter/dji_data/PANORAMA/dji_2022-04-09_10-03-28_0050/DJI_0015.JPG
/Volumes/Untitled/DCIM/PANORAMA/100_0050/DJI_0016.JPG -> /Users/peter/dji_data/PANORAMA/dji_2022-04-09_10-03-28_0050/DJI_0016.JPG
"""
assert os.path.isdir(drone_sd_card_dcim_folder), f"SDCard DCIM folder '{drone_sd_card_dcim_folder}' does not exist"
src_media_folder = os.path.join(drone_sd_card_dcim_folder, media_subfolder_name)
assert os.path.isdir(src_media_folder), f"Did not find folder {media_subfolder_name} in DCIM folder"
if verbose:
print(f'Data found in source DCIM folder {drone_sd_card_dcim_folder}:')
print(get_recursive_directory_contents_string(drone_sd_card_dcim_folder, max_entries=3, indent_level=1))
destination_folder = os.path.expanduser(destination_folder)
# Find media files in DCIM/100MEDIA and give them new names using modified date
src_paths = [os.path.join(src_media_folder, f) for f in os.listdir(src_media_folder) if not f.startswith('.') and any(f.lower().endswith(e) for e in extensions_to_copy)]
src_path_to_new_path = {src_path: get_dest_filepath(src_path=src_path, src_root_dir=src_media_folder, dest_root_dir=destination_folder)
for src_path in src_paths}
# Find pano-source photos in DCIM/PANORAMA and give each folder a new name using modified date
if include_pano_source_images:
pano_folder_path = os.path.join(drone_sd_card_dcim_folder, pano_subfolder_name)
if os.path.exists(pano_folder_path):
pano_dirs = [ppath for f in os.listdir(pano_folder_path) if os.path.isdir(ppath := os.path.join(pano_folder_path, f))]
for src_pano_dir_path in pano_dirs:
pano_dir_order_number = src_pano_dir_path.split('_')[-1]
new_pano_dir_name = modified_timestamp_to_filename(os.path.getmtime(src_pano_dir_path)) + '_' + pano_dir_order_number
for filename in os.listdir(src_pano_dir_path):
src_path_to_new_path[os.path.join(src_pano_dir_path, filename)] = os.path.join(destination_folder, pano_subfolder_name, new_pano_dir_name, filename)
# Filter to only copy when destination file does not exist. TODO: Maybe check file size match here too
src_path_to_size = {src_path: os.path.getsize(src_path) for src_path in src_path_to_new_path}
src_to_dest_to_copy = {src: dest for src, dest in src_path_to_new_path.items() if
overwrite or not os.path.exists(dest) or (check_byte_sizes and src_path_to_size[src] != os.path.getsize(dest))}
# Get file size data and prompt user to confirm copy
size_to_be_copied = sum(src_path_to_size[src] for src in src_to_dest_to_copy)
if verbose:
print('Files to be copied: ')
print(' ' + '\n '.join(f'{i}: {src} -> {dest} ' for i, (src, dest) in enumerate(src_to_dest_to_copy.items())))
response = input(
f"{len(src_to_dest_to_copy)}/{len(src_path_to_new_path)} files ({size_to_be_copied:,} bytes) in {src_media_folder} will be copied to {destination_folder}.\n Type 'copy' to copy >>")
# Do the actual copying.
if response.strip(' ') == 'copy':
print('Copying...')
data_copied = 0
for i, src_path in enumerate(sorted(src_to_dest_to_copy), start=1):
dest_path = src_to_dest_to_copy[src_path]
print(
f'Copied {i}/{len(src_to_dest_to_copy)} files ({data_copied / size_to_be_copied:.1%} of data). Next: {src_path} -> {dest_path} ({src_path_to_size[src_path]:,} B)')
copy_creating_dir_if_needed(src_path, dest_path)
data_copied += src_path_to_size[src_path]
print('Done copying')
if verbose:
print(f'Dest folder {destination_folder} now contains:')
print(get_recursive_directory_contents_string(destination_folder, max_entries=3, indent_level=1))
else:
print("You didn't type 'copy'")
if __name__ == '__main__':
# See docstring for sync_data_from_dji_sd_card function above
parser = ArgumentParser()
parser.add_argument('-s', '--src_dcim', help='DCIM folder from SD Card', default='/Volumes/Untitled/DCIM')
parser.add_argument('-d', '--dest', help='Destination folder to save data', default='~/dji_data')
parser.add_argument('-o', '--overwrite', help='Overwrite existing destination files from source', action='store_true')
args = parser.parse_args(sys.argv[1:])
sync_data_from_dji_sd_card(drone_sd_card_dcim_folder=args.src_dcim, destination_folder=args.dest, overwrite=args.overwrite)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment