Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Upload content of zipfiles to box.com programmatically
#!/usr/bin/env python
"""
Upload images inside zip files to box.com.
"""
import sys, time, webbrowser, json, zipfile, tempfile, urllib3
from argparse import ArgumentParser
from pathlib import Path
from typing import Dict, List, Generator
from tqdm import tqdm
import requests
from boxsdk import (
OAuth2,
Client,
BoxAPIException,
BoxOAuthException,
)
from boxsdk.object.file import File as BoxFile
from boxsdk.object.folder import Folder as BoxFolder
# print = tqdm.write
SECRET_FILE = Path("~/.imctransfer.auth.json").expanduser().absolute()
APP_REDIRECT_URL = "https://imctransfer.herokuapp.com/"
ROOT_DIR_ID = 129299923480
DB_FILE = Path("metadata") / "_upload_record.json"
FAIL_DB = Path("metadata") / "_upload_failures.txt"
max_retries = 3
parser = ArgumentParser()
parser.add_argument("--experiments", nargs="+", default=[])
parser.add_argument("--exclude", nargs="+", default=[])
parser.add_argument("--data-dir", default="~/Downloads", type=Path)
parser.add_argument("--new-auth", action="store_true")
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--write-to-disk", action="store_true")
parser.add_argument("--dev-token", type=str)
# parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
args.data_dir = args.data_dir.expanduser()
if args.verbose:
print(f"CLI arguments: {args}")
def main() -> int:
client = get_client(args.new_auth)
root_dir = BoxFolder(client.session, ROOT_DIR_ID)
data_dir = Path("data")
# if args.dry_run:
# return 0
print("Uploading...")
files: Dict[str, Dict[str, str]] = load_db()
for exp in tqdm(args.experiments, desc="experiment"):
if exp not in files:
files[exp] = dict()
f = args.data_dir / (exp + ".zip")
zf = zipfile.ZipFile(f)
_dir = get_dir(exp, root_dir)
for fn in tqdm(zf.namelist(), desc="image"):
if fn in args.exclude:
continue
# Skip already uploaded
if fn in files[exp]:
if args.verbose:
print(f"{fn} is already uploaded.")
continue
# Skip directories
if "." not in fn.split("/")[-1]:
continue
q = zf.open(fn)
# Select path to use (either data directory or temporary)
if not args.write_to_disk:
tmp = tempfile.NamedTemporaryFile()
path = Path(tmp.name)
else:
path = data_dir / fn
path.parents[0].mkdir(parents=True, exist_ok=True)
with open(path.as_posix(), "wb") as buff:
try:
content = q.read()
except (zipfile.BadZipFile, zipfile.zlib.error):
time.sleep(1)
print(f"Failed reading of file: '{fn}'")
report_failure(fn)
continue
buff.write(content)
# Get subdir to upload to
d = get_dir(fn.split("/")[1], _dir) if "/" in fn else _dir
# Upload
succ = False
cont = False
retry = 0
while not succ:
try:
uf = d.upload(path, Path(q.name).name)
succ = True
except (BoxAPIException, BoxOAuthException) as e:
time.sleep(1)
print(fn)
if e.message == "Refresh token has expired":
# Try to refresh token and get a new client with that auth
refresh_token()
client = get_client()
root_dir = BoxFolder(client.session, ROOT_DIR_ID)
_dir = BoxFolder(client.session, _dir.id)
d = BoxFolder(client.session, d.id)
elif e.message == "Item with the same name already exists":
if args.verbose:
print(
"Item already uploaded but not in local records. Adding."
)
# Get object id and add it to db
f_id = e.context_info["conflicts"]["id"]
uf = BoxFile(client.session, f_id)
succ = True
# TODO: check checksum?
elif e.message == "Internal Server Error":
print(fn)
cont = True
else:
time.sleep(5)
if retry >= max_retries:
cont = True
retry += 1
except (
requests.exceptions.ConnectionError,
urllib3.exceptions.NewConnectionError,
):
cont = True
if cont:
continue
# Get URL as value for file
files[exp][fn] = uf.get_shared_link_download_url()
save_db(files)
return 0
def refresh_token() -> None:
time.sleep(1)
print("Refreshing access token...")
params = json.load(open(SECRET_FILE, "r"))
params["grant_type"] = "refresh_token"
resp = requests.post("https://api.box.com/oauth2/token", data=params)
if not resp.ok:
print(f"Failure! Could not refresh token.")
raise ValueError(f"Could not refresh token: {resp.json()}")
new = resp.json()
print(f"Success! New token expires in {new['expires_in']} seconds.")
new_params = json.load(open(SECRET_FILE, "r"))
new_params["access_token"] = new["access_token"]
new_params["refresh_token"] = new["refresh_token"]
json.dump(new_params, open(SECRET_FILE, "w"), indent=4)
def get_client(force_reconnect=False) -> Client:
print("Authenticating with box.com.")
if args.dev_token is not None:
print("Using developer token.")
secret_params = json.load(open(SECRET_FILE, "r"))
secret_params = dict(
client_id=secret_params["client_id"],
client_secret=secret_params["client_secret"],
acess_token=args.dev_token,
)
else:
if force_reconnect:
new_auth()
secret_params = json.load(open(SECRET_FILE, "r"))
else:
try:
refresh_token()
secret_params = json.load(open(SECRET_FILE, "r"))
except ValueError as e:
msg = e.args[-1]
if (
'No "refresh_token" parameter found' in msg
or "Refresh token has expired" in msg
):
new_auth()
secret_params = json.load(open(SECRET_FILE, "r"))
oauth = OAuth2(**secret_params)
client = Client(oauth)
# # Test authentication
# resp = client.session.get("http://ip.jsontest.com/")
# if not resp.ok:
# msg = "Could not authenticate!"
# raise BoxAPIException(
# msg,
# resp.status_code,
# msg,
# "",
# resp.headers,
# "http://ip.jsontest.com/",
# "GET",
# "Authenticating imctransfer app.",
# )
# print("Successful!")
return client
def new_auth():
secret_params = json.load(open(SECRET_FILE, "r"))
# New user OAuth
if "access_token" in secret_params:
del secret_params["access_token"]
if "refresh_token" in secret_params:
del secret_params["refresh_token"]
oauth = OAuth2(**secret_params)
auth_url, csrf_token = oauth.get_authorization_url(APP_REDIRECT_URL)
print(
"Please copy the code given in the browser webpage and paste the code here."
)
time.sleep(2)
webbrowser.open(auth_url)
time.sleep(1)
(
secret_params["access_token"],
secret_params["refresh_token"],
) = oauth.authenticate(input("Please enter the code here: "))
json.dump(secret_params, open(SECRET_FILE, "w"), indent=4)
def report_failure(file: str) -> None:
with open(FAIL_DB, "a") as handle:
handle.write(file + "\n")
def get_dir(name: str, base: BoxFolder) -> BoxFolder:
try:
_dir = [ite for ite in base.get_items() if ite.name == name][0]
except IndexError:
_dir = base.create_subfolder(name)
return _dir
def list_files_from_folder(
dir_list: List[BoxFolder],
) -> Generator[str, None, None]:
for item in dir_list[-1].get_items():
file_info = item.get()
if file_info.type == "folder":
dir_list.append(item)
yield from list_files_from_folder(dir_list)
else:
yield "/".join(
[x.get().name for x in dir_list]
) + "/" + item.get().name
dir_list.pop()
def load_db() -> Dict[str, Dict[str, str]]:
try:
return json.load(open(DB_FILE, "r"))
except:
return dict()
def save_db(files: Dict[str, Dict[str, str]]) -> None:
json.dump(files, open(DB_FILE, "w"), indent=4)
def get_files_from_box(root_dir: BoxFolder) -> Dict[str, Dict[str, str]]:
# TODO: rewrite recursive
res: Dict[str, Dict[str, str]] = dict()
i1 = list(root_dir.get_items())
for exp in tqdm(i1):
if exp.type == "folder":
if exp not in res:
res[exp.name] = dict()
i2 = list(exp.get_items())
for file in tqdm(i2):
if exp.type == "file":
res[exp.name][exp.name + "/" + file.name] = file.get()
else:
i3 = list(file.get_items())
for file2 in tqdm(i3):
if file2.type == "file":
res[exp.name][
exp.name + "/" + file.name + "/" + file2.name
] = file2.get_shared_link_download_url()
else:
i4 = list(file2.get_items())
for file3 in tqdm(i4):
if file3.type == "file":
res[exp.name][
exp.name
+ "/"
+ file.name
+ "/"
+ file2.name
+ "/"
+ file3.name
] = file3.get_shared_link_download_url()
return res
def get_urls(obj, files={}, prefix=""):
"""
Recursive version of `get_files_from_box`.
TODO: needs post-processing for output to have same shape.
"""
if isinstance(obj, BoxFile):
files[prefix + "/" + obj.name] = obj.get_shared_link_download_url()
return files
# elif obj.type == 'folder':
else:
print(f"Folder: {str(obj)}")
for i, obj2 in enumerate(obj.get_items()):
if i % 100 == 0:
print(f"{str(obj)}: {i}")
files.update(
get_urls(obj2, files=files, prefix=prefix + "/" + obj2.name)
)
return files
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment