Skip to content

Instantly share code, notes, and snippets.

@nmichlo
Last active July 19, 2022 10:31
Show Gist options
  • Save nmichlo/7b13ad39f9de45f43631409fe6db3bba to your computer and use it in GitHub Desktop.
Save nmichlo/7b13ad39f9de45f43631409fe6db3bba to your computer and use it in GitHub Desktop.
Script to start an export job for a google drive folder and then download it.
# MIT LICENSE
# Nathan Michlo
"""
Script to bypass rate limits for a GoogleDrive file
by creating an export job for the parent folder.
1. User manually specifies the file id of this parent folder
2. Script starts an export job for this folder
3. Script polls and waits for completion of the export job
- Script obtains download url from this export job
4. Download exported file
USAGE:
- export and download into current folder:
$ python google_drive_export.py --folder-id=<folder_id>
- export and download into current folder (VERBOSE):
$ python google_drive_export.py --folder-id=<folder_id> --verbose
- export and obtain download url
$ python google_drive_export.py --folder-id=<folder_id> --skip-download
- reuse previous export and download into current folder:
$ python google_drive_export.py --export-job-id=<export_job_id>
- reuse previous export and obtain download url:
$ python google_drive_export.py --export-job-id=<export_job_id> --skip-download
OBTAINING FOLDER ID:
- The <folder_id> can be obtained from the Google Drive UI:
(right click folder) > Get Link > Copy Link
- The <export_job_id> can only be obtained from this script, and is
intended for resuming/restarting previous exports to save on resources/time.
"""
import argparse
import json
import logging
import os
import shutil
import tempfile
import time
from pathlib import Path
from pprint import pprint
import pydantic
import requests
from tqdm import tqdm
class DownloadProgressBar(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
def download_file(url: str, save_path: str, chunk_size: int = 16384, **request_kwargs):
import requests
from tqdm import tqdm
# get the content size
response = requests.get(url, stream=True, **request_kwargs)
total_length = response.headers.get('content-length')
# cast to integer if content-length exists on response
if total_length is not None:
total_length = int(total_length)
# download with progress bar
with tqdm(total=total_length, desc=f'Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
print(f'Downloading: {url} to: {save_path}')
# create a temporary directory to download into, then move to target location
with tempfile.TemporaryDirectory(dir=Path(save_path).parent) as td:
temp_path = os.path.join(td, Path(save_path).name)
# download to temp file
with open(temp_path, 'wb') as file:
for data in response.iter_content(chunk_size=chunk_size):
file.write(data)
progress.update(chunk_size)
# move temp file to target location
shutil.move(temp_path, save_path)
class ExportArchive(pydantic.BaseModel):
compressedSize: int
fileName: str
sizeOfContents: int
storagePath: str
class GoogleDriveFolderExporter(object):
_URL_EXPORTS = os.environ.get("GOOGLE_EXPORTS_URL", 'https://takeout-pa.clients6.google.com/v1/exports')
_CREDENTIALS_FILE = os.environ.get("GOOGLE_CREDENTIALS_FILE", "credentials.json")
_API_KEY_FILE = os.environ.get("GOOGLE_API_KEY_FILE", "api_key.json")
def __init__(self, api_key: str = None, access_token: str = None):
# get the api key for accessing google, obtain this from Google cloud :APIs and Services":
# - https://console.cloud.google.com/apis/credentials
self._api_key = api_key
if self._api_key is None:
self._api_key = os.environ.get('GOOGLE_CLOUD_API_KEY', None)
if self._api_key is None:
with open(self._API_KEY_FILE, 'r') as file:
self._api_key = json.load(file)['api_key']
if self._api_key is None:
raise RuntimeError(f'GOOGLE_CLOUD_API_KEY must be set, or {self._API_KEY_FILE} must be created.')
# login with Google if needed
# - if you use pydrive2 then you will need to download the client_secrets.json from
# https://console.cloud.google.com/apis/credentials
self._access_token = access_token
if self._access_token is None:
self._access_token = os.environ.get('GOOGLE_ACCESS_TOKEN', None)
if self._access_token is None:
self._access_token = self._drive_login_get_credentials().access_token
if self._access_token is None:
raise RuntimeError(f'GOOGLE_ACCESS_TOKEN must be set, or {self._CREDENTIALS_FILE} must be created.')
def _new_folder_export(self, folder_id: str, archive_prefix: str = None) -> str:
"""
Start a new compression job for a specified google drive folder
"""
response = requests.post(
self._URL_EXPORTS,
json={
# 'archiveFormat': None, 'conversions': None, 'locale': None, 'archivePrefix': "papers", # name of folder
'archivePrefix': archive_prefix,
'items': [{'id': folder_id}],
},
headers={'Authorization': self._access_token},
params={'key': self._api_key}
)
response = response.json()
export_id = response['exportJob']['id']
print(f'new export job started: {export_id}')
return export_id
def _query_folder_export(self, export_id: str):
"""
This endpoint repeatedly queries to check if the previously started compression job is finished.
- If it is finished, then a download endpoint and file details are returned
- This endpoint should be called multiple times on a rate limit -- this is how google drive in the web interface does it
"""
print(f'querying export job: {export_id}')
response = requests.get(
f"{self._URL_EXPORTS}/{export_id}",
headers={'Authorization': self._access_token},
params={'key': self._api_key}
)
response = response.json()
# get completion
completion = response['percentDone']
if completion != 100:
print(f'job not done, still processing: {completion}%')
return None
# get link
archives = response['exportJob']['archives']
if len(archives) != 1:
print('more than one archive found!')
archive = ExportArchive(**archives[0])
print(f'job is done, returning information: {archive}')
pprint(archives)
return archive
def _wait_for_folder_export(self, export_job_id: str, wait_delay: float = 5, wait_retries: int = 1000, initial_delay: bool = True):
"""
Poll multiple times to check if a job is done.
-- This is how the google drive web interface does it
"""
assert wait_delay >= 1
assert wait_retries >= 1
for i in range(wait_retries):
if i != 0 or initial_delay:
time.sleep(wait_delay)
archive = self._query_folder_export(export_job_id)
if archive is not None:
return archive
def export_folder(
self,
folder_id: str = None,
export_job_id: str = None,
save_name: str = None,
skip_download: bool = False,
wait_delay: float = 3,
wait_retries: int = 1000,
):
"""
Start a compression job, wait for it to finish, and download the file from the resulting endpoint
"""
# check arguments
if (folder_id is None) and (export_job_id is None):
raise ValueError('One of `folder_id` or `export_job_id` must be specified.')
elif (folder_id is not None) and (export_job_id is not None):
raise ValueError('Only one of `folder_id` or `export_job_id` must be specified.')
# start an export job
if export_job_id is None:
export_job_id = self._new_folder_export(folder_id=folder_id, archive_prefix=None)
# wait for the export job to complete
archive = self._wait_for_folder_export(
export_job_id=export_job_id,
wait_delay=wait_delay,
wait_retries=wait_retries,
initial_delay=True,
)
# download the file if needed
if not skip_download:
download_file(archive.storagePath, save_name if save_name else archive.fileName)
@classmethod
def _drive_login_get_credentials(cls):
"""
Login to google and obtain an access token using OAuth2
- to use pydrive2 then you will need to download the client_secrets.json from
https://console.cloud.google.com/apis/credentials
"""
import pydrive2.auth
from oauth2client.client import OAuth2Credentials
# get google authenticator
gauth = pydrive2.auth.GoogleAuth()
# Try to load saved client credentials
gauth.LoadCredentialsFile(cls._CREDENTIALS_FILE)
if (gauth.credentials is None) or gauth.access_token_expired:
gauth.CommandLineAuth()
else:
gauth.Authorize()
gauth.SaveCredentialsFile(cls._CREDENTIALS_FILE)
creds: OAuth2Credentials = gauth.credentials
# return the credentials
return creds
def make_export_parser():
parser = argparse.ArgumentParser()
# general arguments
parser.add_argument(
"--folder-id", type=str, default=None, help="The `folder_id` obtained from the google drive UI (right click folder > get link > manually extract id from this url)"
)
parser.add_argument(
"--export-job-id", type=str, default=None, help="The `export_job_id` obtained from running this script with --folder-id, intended to resume previous runs more efficiently without starting a new export job."
)
parser.add_argument(
"--out-name", type=str, default=None, help="The name to use when saving the file, eg: `download.zip`"
)
parser.add_argument(
"--skip-download", action="store_true", help="Do not download the exported folder, only obtain and print the download url."
)
parser.add_argument(
"--verbose", action="store_true", help="Print extra download information."
)
return parser
def export_cli():
args = make_export_parser().parse_args()
# enable verbose mode
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
# export the files
exporter = GoogleDriveFolderExporter()
exporter.export_folder(
folder_id=args.folder_id,
export_job_id=args.export_job_id,
skip_download=args.skip_download,
save_name=args.out_name,
)
if __name__ == '__main__':
export_cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment