Last active
July 19, 2022 10:31
-
-
Save nmichlo/7b13ad39f9de45f43631409fe6db3bba to your computer and use it in GitHub Desktop.
Script to start an export job for a google drive folder and then download it.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MIT LICENSE | |
# Nathan Michlo | |
""" | |
Script to bypass rate limits for a GoogleDrive file | |
by creating an export job for the parent folder. | |
1. User manually specifies the file id of this parent folder | |
2. Script starts an export job for this folder | |
3. Script polls and waits for completion of the export job | |
- Script obtains download url from this export job | |
4. Download exported file | |
USAGE: | |
- export and download into current folder: | |
$ python google_drive_export.py --folder-id=<folder_id> | |
- export and download into current folder (VERBOSE): | |
$ python google_drive_export.py --folder-id=<folder_id> --verbose | |
- export and obtain download url | |
$ python google_drive_export.py --folder-id=<folder_id> --skip-download | |
- reuse previous export and download into current folder: | |
$ python google_drive_export.py --export-job-id=<export_job_id> | |
- reuse previous export and obtain download url: | |
$ python google_drive_export.py --export-job-id=<export_job_id> --skip-download | |
OBTAINING FOLDER ID: | |
- The <folder_id> can be obtained from the Google Drive UI: | |
(right click folder) > Get Link > Copy Link | |
- The <export_job_id> can only be obtained from this script, and is | |
intended for resuming/restarting previous exports to save on resources/time. | |
""" | |
import argparse | |
import json | |
import logging | |
import os | |
import shutil | |
import tempfile | |
import time | |
from pathlib import Path | |
from pprint import pprint | |
import pydantic | |
import requests | |
from tqdm import tqdm | |
class DownloadProgressBar(tqdm): | |
def update_to(self, b=1, bsize=1, tsize=None): | |
if tsize is not None: | |
self.total = tsize | |
self.update(b * bsize - self.n) | |
def download_file(url: str, save_path: str, chunk_size: int = 16384, **request_kwargs): | |
import requests | |
from tqdm import tqdm | |
# get the content size | |
response = requests.get(url, stream=True, **request_kwargs) | |
total_length = response.headers.get('content-length') | |
# cast to integer if content-length exists on response | |
if total_length is not None: | |
total_length = int(total_length) | |
# download with progress bar | |
with tqdm(total=total_length, desc=f'Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress: | |
print(f'Downloading: {url} to: {save_path}') | |
# create a temporary directory to download into, then move to target location | |
with tempfile.TemporaryDirectory(dir=Path(save_path).parent) as td: | |
temp_path = os.path.join(td, Path(save_path).name) | |
# download to temp file | |
with open(temp_path, 'wb') as file: | |
for data in response.iter_content(chunk_size=chunk_size): | |
file.write(data) | |
progress.update(chunk_size) | |
# move temp file to target location | |
shutil.move(temp_path, save_path) | |
class ExportArchive(pydantic.BaseModel): | |
compressedSize: int | |
fileName: str | |
sizeOfContents: int | |
storagePath: str | |
class GoogleDriveFolderExporter(object): | |
_URL_EXPORTS = os.environ.get("GOOGLE_EXPORTS_URL", 'https://takeout-pa.clients6.google.com/v1/exports') | |
_CREDENTIALS_FILE = os.environ.get("GOOGLE_CREDENTIALS_FILE", "credentials.json") | |
_API_KEY_FILE = os.environ.get("GOOGLE_API_KEY_FILE", "api_key.json") | |
def __init__(self, api_key: str = None, access_token: str = None): | |
# get the api key for accessing google, obtain this from Google cloud :APIs and Services": | |
# - https://console.cloud.google.com/apis/credentials | |
self._api_key = api_key | |
if self._api_key is None: | |
self._api_key = os.environ.get('GOOGLE_CLOUD_API_KEY', None) | |
if self._api_key is None: | |
with open(self._API_KEY_FILE, 'r') as file: | |
self._api_key = json.load(file)['api_key'] | |
if self._api_key is None: | |
raise RuntimeError(f'GOOGLE_CLOUD_API_KEY must be set, or {self._API_KEY_FILE} must be created.') | |
# login with Google if needed | |
# - if you use pydrive2 then you will need to download the client_secrets.json from | |
# https://console.cloud.google.com/apis/credentials | |
self._access_token = access_token | |
if self._access_token is None: | |
self._access_token = os.environ.get('GOOGLE_ACCESS_TOKEN', None) | |
if self._access_token is None: | |
self._access_token = self._drive_login_get_credentials().access_token | |
if self._access_token is None: | |
raise RuntimeError(f'GOOGLE_ACCESS_TOKEN must be set, or {self._CREDENTIALS_FILE} must be created.') | |
def _new_folder_export(self, folder_id: str, archive_prefix: str = None) -> str: | |
""" | |
Start a new compression job for a specified google drive folder | |
""" | |
response = requests.post( | |
self._URL_EXPORTS, | |
json={ | |
# 'archiveFormat': None, 'conversions': None, 'locale': None, 'archivePrefix': "papers", # name of folder | |
'archivePrefix': archive_prefix, | |
'items': [{'id': folder_id}], | |
}, | |
headers={'Authorization': self._access_token}, | |
params={'key': self._api_key} | |
) | |
response = response.json() | |
export_id = response['exportJob']['id'] | |
print(f'new export job started: {export_id}') | |
return export_id | |
def _query_folder_export(self, export_id: str): | |
""" | |
This endpoint repeatedly queries to check if the previously started compression job is finished. | |
- If it is finished, then a download endpoint and file details are returned | |
- This endpoint should be called multiple times on a rate limit -- this is how google drive in the web interface does it | |
""" | |
print(f'querying export job: {export_id}') | |
response = requests.get( | |
f"{self._URL_EXPORTS}/{export_id}", | |
headers={'Authorization': self._access_token}, | |
params={'key': self._api_key} | |
) | |
response = response.json() | |
# get completion | |
completion = response['percentDone'] | |
if completion != 100: | |
print(f'job not done, still processing: {completion}%') | |
return None | |
# get link | |
archives = response['exportJob']['archives'] | |
if len(archives) != 1: | |
print('more than one archive found!') | |
archive = ExportArchive(**archives[0]) | |
print(f'job is done, returning information: {archive}') | |
pprint(archives) | |
return archive | |
def _wait_for_folder_export(self, export_job_id: str, wait_delay: float = 5, wait_retries: int = 1000, initial_delay: bool = True): | |
""" | |
Poll multiple times to check if a job is done. | |
-- This is how the google drive web interface does it | |
""" | |
assert wait_delay >= 1 | |
assert wait_retries >= 1 | |
for i in range(wait_retries): | |
if i != 0 or initial_delay: | |
time.sleep(wait_delay) | |
archive = self._query_folder_export(export_job_id) | |
if archive is not None: | |
return archive | |
def export_folder( | |
self, | |
folder_id: str = None, | |
export_job_id: str = None, | |
save_name: str = None, | |
skip_download: bool = False, | |
wait_delay: float = 3, | |
wait_retries: int = 1000, | |
): | |
""" | |
Start a compression job, wait for it to finish, and download the file from the resulting endpoint | |
""" | |
# check arguments | |
if (folder_id is None) and (export_job_id is None): | |
raise ValueError('One of `folder_id` or `export_job_id` must be specified.') | |
elif (folder_id is not None) and (export_job_id is not None): | |
raise ValueError('Only one of `folder_id` or `export_job_id` must be specified.') | |
# start an export job | |
if export_job_id is None: | |
export_job_id = self._new_folder_export(folder_id=folder_id, archive_prefix=None) | |
# wait for the export job to complete | |
archive = self._wait_for_folder_export( | |
export_job_id=export_job_id, | |
wait_delay=wait_delay, | |
wait_retries=wait_retries, | |
initial_delay=True, | |
) | |
# download the file if needed | |
if not skip_download: | |
download_file(archive.storagePath, save_name if save_name else archive.fileName) | |
@classmethod | |
def _drive_login_get_credentials(cls): | |
""" | |
Login to google and obtain an access token using OAuth2 | |
- to use pydrive2 then you will need to download the client_secrets.json from | |
https://console.cloud.google.com/apis/credentials | |
""" | |
import pydrive2.auth | |
from oauth2client.client import OAuth2Credentials | |
# get google authenticator | |
gauth = pydrive2.auth.GoogleAuth() | |
# Try to load saved client credentials | |
gauth.LoadCredentialsFile(cls._CREDENTIALS_FILE) | |
if (gauth.credentials is None) or gauth.access_token_expired: | |
gauth.CommandLineAuth() | |
else: | |
gauth.Authorize() | |
gauth.SaveCredentialsFile(cls._CREDENTIALS_FILE) | |
creds: OAuth2Credentials = gauth.credentials | |
# return the credentials | |
return creds | |
def make_export_parser(): | |
parser = argparse.ArgumentParser() | |
# general arguments | |
parser.add_argument( | |
"--folder-id", type=str, default=None, help="The `folder_id` obtained from the google drive UI (right click folder > get link > manually extract id from this url)" | |
) | |
parser.add_argument( | |
"--export-job-id", type=str, default=None, help="The `export_job_id` obtained from running this script with --folder-id, intended to resume previous runs more efficiently without starting a new export job." | |
) | |
parser.add_argument( | |
"--out-name", type=str, default=None, help="The name to use when saving the file, eg: `download.zip`" | |
) | |
parser.add_argument( | |
"--skip-download", action="store_true", help="Do not download the exported folder, only obtain and print the download url." | |
) | |
parser.add_argument( | |
"--verbose", action="store_true", help="Print extra download information." | |
) | |
return parser | |
def export_cli(): | |
args = make_export_parser().parse_args() | |
# enable verbose mode | |
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) | |
# export the files | |
exporter = GoogleDriveFolderExporter() | |
exporter.export_folder( | |
folder_id=args.folder_id, | |
export_job_id=args.export_job_id, | |
skip_download=args.skip_download, | |
save_name=args.out_name, | |
) | |
if __name__ == '__main__': | |
export_cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment