Created
March 29, 2021 17:19
-
-
Save jkiang13/83bcf0aabf0f7f426762cdb36085ea07 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Command line script to recursively upload a local folder structure to Synapse to | |
a specified container | |
Usage, e.g. | |
python ./upload_directory_to_synapse /path/to/local/directory syn12345 | |
""" | |
import argparse | |
import os | |
import synapseclient | |
from synapseclient.core.exceptions import SynapseError | |
from synapseclient.entity import is_container | |
from synapseclient.__main__ import login_with_prompt | |
def _store(syn, root_path, root_container_id, followlinks): | |
stack = [(root_path, root_container_id)] | |
seen_directories = set() | |
while stack: | |
dir_path, container_id = stack.pop() | |
for name in os.listdir(dir_path): | |
path = os.path.join(dir_path, name) | |
if not followlinks: | |
if os.path.islink(path): | |
continue | |
elif os.path.realpath(path) in seen_directories: | |
# we have seen this real path before, skip to avoid infinite recursion | |
continue | |
else: | |
seen_directories.add(os.path.realpath(path)) | |
if os.path.isfile(path): | |
file = synapseclient.File(path, parentId=container_id) | |
syn.store(file) | |
elif os.path.isdir(path): | |
folder = synapseclient.Folder(name=name, parentId=container_id) | |
folder = syn.store(folder) | |
stack.append((path, folder.id)) | |
def main(): | |
parser = argparse.ArgumentParser(description="Recursively upload a local directory structure to Synapse") | |
parser.add_argument('path', help='Local path of the directory to recursively upload') | |
parser.add_argument('parent', help='Synapse ID of the parent container to upload to') | |
parser.add_argument('--followlinks', action='store_true', default=False, | |
help='Whether symbolic links should be resolved') | |
args = parser.parse_args() | |
if not os.path.isdir(args.path): | |
raise ValueError(f"{args.path} is not a directory") | |
syn = synapseclient.Synapse() | |
# ths will login using cached credentials or prompt if none available | |
login_with_prompt(syn, None, None) | |
try: | |
folder = syn.get(args.parent, downloadFile=False) | |
if not is_container(folder): | |
raise ValueError(f"{args.parent} is not a Synapse Folder or Project") | |
except SynapseError as ex: | |
raise ValueError( | |
f"Can't upload to {args.parent}, is it a Synapse container you have access to?" | |
) from ex | |
_store(syn, args.path, args.parent, args.followlinks) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment