Upload content of zipfiles to box.com programmatically
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
Upload images inside zip files to box.com. | |
""" | |
import sys, time, webbrowser, json, zipfile, tempfile, urllib3 | |
from argparse import ArgumentParser | |
from pathlib import Path | |
from typing import Dict, List, Generator | |
from tqdm import tqdm | |
import requests | |
from boxsdk import ( | |
OAuth2, | |
Client, | |
BoxAPIException, | |
BoxOAuthException, | |
) | |
from boxsdk.object.file import File as BoxFile | |
from boxsdk.object.folder import Folder as BoxFolder | |
# print = tqdm.write | |
SECRET_FILE = Path("~/.imctransfer.auth.json").expanduser().absolute() | |
APP_REDIRECT_URL = "https://imctransfer.herokuapp.com/" | |
ROOT_DIR_ID = 129299923480 | |
DB_FILE = Path("metadata") / "_upload_record.json" | |
FAIL_DB = Path("metadata") / "_upload_failures.txt" | |
max_retries = 3 | |
parser = ArgumentParser() | |
parser.add_argument("--experiments", nargs="+", default=[]) | |
parser.add_argument("--exclude", nargs="+", default=[]) | |
parser.add_argument("--data-dir", default="~/Downloads", type=Path) | |
parser.add_argument("--new-auth", action="store_true") | |
parser.add_argument("--verbose", action="store_true") | |
parser.add_argument("--write-to-disk", action="store_true") | |
parser.add_argument("--dev-token", type=str) | |
# parser.add_argument("--dry-run", action="store_true") | |
args = parser.parse_args() | |
args.data_dir = args.data_dir.expanduser() | |
if args.verbose: | |
print(f"CLI arguments: {args}") | |
def main() -> int: | |
client = get_client(args.new_auth) | |
root_dir = BoxFolder(client.session, ROOT_DIR_ID) | |
data_dir = Path("data") | |
# if args.dry_run: | |
# return 0 | |
print("Uploading...") | |
files: Dict[str, Dict[str, str]] = load_db() | |
for exp in tqdm(args.experiments, desc="experiment"): | |
if exp not in files: | |
files[exp] = dict() | |
f = args.data_dir / (exp + ".zip") | |
zf = zipfile.ZipFile(f) | |
_dir = get_dir(exp, root_dir) | |
for fn in tqdm(zf.namelist(), desc="image"): | |
if fn in args.exclude: | |
continue | |
# Skip already uploaded | |
if fn in files[exp]: | |
if args.verbose: | |
print(f"{fn} is already uploaded.") | |
continue | |
# Skip directories | |
if "." not in fn.split("/")[-1]: | |
continue | |
q = zf.open(fn) | |
# Select path to use (either data directory or temporary) | |
if not args.write_to_disk: | |
tmp = tempfile.NamedTemporaryFile() | |
path = Path(tmp.name) | |
else: | |
path = data_dir / fn | |
path.parents[0].mkdir(parents=True, exist_ok=True) | |
with open(path.as_posix(), "wb") as buff: | |
try: | |
content = q.read() | |
except (zipfile.BadZipFile, zipfile.zlib.error): | |
time.sleep(1) | |
print(f"Failed reading of file: '{fn}'") | |
report_failure(fn) | |
continue | |
buff.write(content) | |
# Get subdir to upload to | |
d = get_dir(fn.split("/")[1], _dir) if "/" in fn else _dir | |
# Upload | |
succ = False | |
cont = False | |
retry = 0 | |
while not succ: | |
try: | |
uf = d.upload(path, Path(q.name).name) | |
succ = True | |
except (BoxAPIException, BoxOAuthException) as e: | |
time.sleep(1) | |
print(fn) | |
if e.message == "Refresh token has expired": | |
# Try to refresh token and get a new client with that auth | |
refresh_token() | |
client = get_client() | |
root_dir = BoxFolder(client.session, ROOT_DIR_ID) | |
_dir = BoxFolder(client.session, _dir.id) | |
d = BoxFolder(client.session, d.id) | |
elif e.message == "Item with the same name already exists": | |
if args.verbose: | |
print( | |
"Item already uploaded but not in local records. Adding." | |
) | |
# Get object id and add it to db | |
f_id = e.context_info["conflicts"]["id"] | |
uf = BoxFile(client.session, f_id) | |
succ = True | |
# TODO: check checksum? | |
elif e.message == "Internal Server Error": | |
print(fn) | |
cont = True | |
else: | |
time.sleep(5) | |
if retry >= max_retries: | |
cont = True | |
retry += 1 | |
except ( | |
requests.exceptions.ConnectionError, | |
urllib3.exceptions.NewConnectionError, | |
): | |
cont = True | |
if cont: | |
continue | |
# Get URL as value for file | |
files[exp][fn] = uf.get_shared_link_download_url() | |
save_db(files) | |
return 0 | |
def refresh_token() -> None: | |
time.sleep(1) | |
print("Refreshing access token...") | |
params = json.load(open(SECRET_FILE, "r")) | |
params["grant_type"] = "refresh_token" | |
resp = requests.post("https://api.box.com/oauth2/token", data=params) | |
if not resp.ok: | |
print(f"Failure! Could not refresh token.") | |
raise ValueError(f"Could not refresh token: {resp.json()}") | |
new = resp.json() | |
print(f"Success! New token expires in {new['expires_in']} seconds.") | |
new_params = json.load(open(SECRET_FILE, "r")) | |
new_params["access_token"] = new["access_token"] | |
new_params["refresh_token"] = new["refresh_token"] | |
json.dump(new_params, open(SECRET_FILE, "w"), indent=4) | |
def get_client(force_reconnect=False) -> Client: | |
print("Authenticating with box.com.") | |
if args.dev_token is not None: | |
print("Using developer token.") | |
secret_params = json.load(open(SECRET_FILE, "r")) | |
secret_params = dict( | |
client_id=secret_params["client_id"], | |
client_secret=secret_params["client_secret"], | |
acess_token=args.dev_token, | |
) | |
else: | |
if force_reconnect: | |
new_auth() | |
secret_params = json.load(open(SECRET_FILE, "r")) | |
else: | |
try: | |
refresh_token() | |
secret_params = json.load(open(SECRET_FILE, "r")) | |
except ValueError as e: | |
msg = e.args[-1] | |
if ( | |
'No "refresh_token" parameter found' in msg | |
or "Refresh token has expired" in msg | |
): | |
new_auth() | |
secret_params = json.load(open(SECRET_FILE, "r")) | |
oauth = OAuth2(**secret_params) | |
client = Client(oauth) | |
# # Test authentication | |
# resp = client.session.get("http://ip.jsontest.com/") | |
# if not resp.ok: | |
# msg = "Could not authenticate!" | |
# raise BoxAPIException( | |
# msg, | |
# resp.status_code, | |
# msg, | |
# "", | |
# resp.headers, | |
# "http://ip.jsontest.com/", | |
# "GET", | |
# "Authenticating imctransfer app.", | |
# ) | |
# print("Successful!") | |
return client | |
def new_auth(): | |
secret_params = json.load(open(SECRET_FILE, "r")) | |
# New user OAuth | |
if "access_token" in secret_params: | |
del secret_params["access_token"] | |
if "refresh_token" in secret_params: | |
del secret_params["refresh_token"] | |
oauth = OAuth2(**secret_params) | |
auth_url, csrf_token = oauth.get_authorization_url(APP_REDIRECT_URL) | |
print( | |
"Please copy the code given in the browser webpage and paste the code here." | |
) | |
time.sleep(2) | |
webbrowser.open(auth_url) | |
time.sleep(1) | |
( | |
secret_params["access_token"], | |
secret_params["refresh_token"], | |
) = oauth.authenticate(input("Please enter the code here: ")) | |
json.dump(secret_params, open(SECRET_FILE, "w"), indent=4) | |
def report_failure(file: str) -> None: | |
with open(FAIL_DB, "a") as handle: | |
handle.write(file + "\n") | |
def get_dir(name: str, base: BoxFolder) -> BoxFolder: | |
try: | |
_dir = [ite for ite in base.get_items() if ite.name == name][0] | |
except IndexError: | |
_dir = base.create_subfolder(name) | |
return _dir | |
def list_files_from_folder( | |
dir_list: List[BoxFolder], | |
) -> Generator[str, None, None]: | |
for item in dir_list[-1].get_items(): | |
file_info = item.get() | |
if file_info.type == "folder": | |
dir_list.append(item) | |
yield from list_files_from_folder(dir_list) | |
else: | |
yield "/".join( | |
[x.get().name for x in dir_list] | |
) + "/" + item.get().name | |
dir_list.pop() | |
def load_db() -> Dict[str, Dict[str, str]]: | |
try: | |
return json.load(open(DB_FILE, "r")) | |
except: | |
return dict() | |
def save_db(files: Dict[str, Dict[str, str]]) -> None: | |
json.dump(files, open(DB_FILE, "w"), indent=4) | |
def get_files_from_box(root_dir: BoxFolder) -> Dict[str, Dict[str, str]]: | |
# TODO: rewrite recursive | |
res: Dict[str, Dict[str, str]] = dict() | |
i1 = list(root_dir.get_items()) | |
for exp in tqdm(i1): | |
if exp.type == "folder": | |
if exp not in res: | |
res[exp.name] = dict() | |
i2 = list(exp.get_items()) | |
for file in tqdm(i2): | |
if exp.type == "file": | |
res[exp.name][exp.name + "/" + file.name] = file.get() | |
else: | |
i3 = list(file.get_items()) | |
for file2 in tqdm(i3): | |
if file2.type == "file": | |
res[exp.name][ | |
exp.name + "/" + file.name + "/" + file2.name | |
] = file2.get_shared_link_download_url() | |
else: | |
i4 = list(file2.get_items()) | |
for file3 in tqdm(i4): | |
if file3.type == "file": | |
res[exp.name][ | |
exp.name | |
+ "/" | |
+ file.name | |
+ "/" | |
+ file2.name | |
+ "/" | |
+ file3.name | |
] = file3.get_shared_link_download_url() | |
return res | |
def get_urls(obj, files={}, prefix=""): | |
""" | |
Recursive version of `get_files_from_box`. | |
TODO: needs post-processing for output to have same shape. | |
""" | |
if isinstance(obj, BoxFile): | |
files[prefix + "/" + obj.name] = obj.get_shared_link_download_url() | |
return files | |
# elif obj.type == 'folder': | |
else: | |
print(f"Folder: {str(obj)}") | |
for i, obj2 in enumerate(obj.get_items()): | |
if i % 100 == 0: | |
print(f"{str(obj)}: {i}") | |
files.update( | |
get_urls(obj2, files=files, prefix=prefix + "/" + obj2.name) | |
) | |
return files | |
if __name__ == "__main__": | |
try: | |
sys.exit(main()) | |
except KeyboardInterrupt: | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment