Last active
November 27, 2024 02:32
-
-
Save hiroshil/0ac3c8b12e7fa71ebb979e738f92086c to your computer and use it in GitHub Desktop.
Script to download files or folders from Google Drive using PyDrive2 (with Tqdm progress bar and resumable download supported)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
from pathlib import Path | |
from tqdm import tqdm as Tqdm | |
from typing import Any, ClassVar | |
from googleapiclient import errors | |
from googleapiclient.http import MediaIoBaseDownload | |
from googleapiclient import _helpers as util | |
from pydrive2.auth import GoogleAuth, LoadAuth | |
from pydrive2.drive import GoogleDrive | |
from pydrive2.files import GoogleDriveFile, FileNotUploadedError, ApiRequestError | |
DEFAULT_CHUNK_SIZE = 100 * 1024 * 1024 # 100Mb | |
BAR_FMT_DEFAULT = ( | |
"{percentage:3.0f}% {desc}|{bar}|" | |
"{postfix}{n_fmt}/{total_fmt}" | |
" [{elapsed}<{remaining}, {rate_fmt:>11}]" | |
) | |
BYTES_DEFAULTS: ClassVar[dict[str, Any]] = { | |
"unit": "B", | |
"unit_scale": True, | |
"unit_divisor": 1024, | |
"miniters": 1, | |
} | |
mode = 1 # 0 file, 1 folder | |
param = "FileID_or_FolderID_or_URL" #FileID is the simple file hash, like 0B1NzlxZ5RpdKS0NOS0x0Ym9kR0U | |
LOCAL_PATH = Path("PATH_TO_SAVE") | |
err_fmt = False | |
if len(param) > 33: | |
match = re.search(r"https:\/\/drive\.google\.com\/(?:file\/d\/([A-Za-z0-9_-]{33})\/view" | |
"\?usp=drive_link|drive\/folders\/([A-Za-z0-9_-]{33})\?usp=drive_link)", param) | |
if match: | |
if match.group(1): | |
mode = 0 | |
DOWNLOAD_ID = match.group(1) | |
elif match.group(2): | |
mode = 1 | |
DOWNLOAD_ID = match.group(2) | |
else: | |
err_fmt = True | |
elif len(param) < 33: | |
err_fmt = True | |
else: | |
DOWNLOAD_ID = param | |
if err_fmt: | |
print("String not found in the any format.") | |
exit() | |
class MediaIoBaseDownloadResumable(MediaIoBaseDownload): | |
@util.positional(4) | |
def __init__(self, fd, request, chunksize, resume_byte_pos): | |
super().__init__(fd, request, chunksize) # Call the parent class's __init__ | |
if resume_byte_pos: | |
self._progress = resume_byte_pos | |
@LoadAuth | |
def GetContentFileResumable( | |
self, | |
filename, | |
mimetype=None, | |
remove_bom=False, | |
skipValidate=True, | |
callback=None, | |
resume_byte_pos=0, | |
chunksize=DEFAULT_CHUNK_SIZE, | |
acknowledge_abuse=False, | |
): | |
"""Save content of this file as a local file. | |
:param filename: name of the file to write to. | |
:type filename: str | |
:param mimetype: mimeType of the file. | |
:type mimetype: str | |
:param remove_bom: Whether to remove the byte order marking. | |
:type remove_bom: bool | |
:param callback: passed two arguments: (total transferred, file size). | |
:type param: callable | |
:param chunksize: chunksize in bytes (standard 100 MB(1024*1024*100)) | |
:type chunksize: int | |
:param acknowledge_abuse: Acknowledging the risk and download file | |
identified as abusive. | |
:type acknowledge_abuse: bool | |
:raises: ApiRequestError, FileNotUploadedError | |
""" | |
files = self.auth.service.files() | |
file_id = self.metadata.get("id") or self.get("id") | |
if not file_id: | |
raise FileNotUploadedError() | |
def download(fd, request): | |
downloader = MediaIoBaseDownloadResumable( | |
fd, self._WrapRequest(request), chunksize=chunksize, resume_byte_pos=resume_byte_pos | |
) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
if callback: | |
callback(status.resumable_progress, status.total_size) | |
mode = 'a+b' if resume_byte_pos else 'w+b' | |
with open(filename, mode=mode) as fd: | |
# Should use files.export_media instead of files.get_media if | |
# metadata["mimeType"].startswith("application/vnd.google-apps."). | |
# But that would first require a slow call to FetchMetadata(). | |
# We prefer to try-except for speed. | |
try: | |
download( | |
fd, | |
files.get_media( | |
fileId=file_id, acknowledgeAbuse=acknowledge_abuse | |
), | |
) | |
except errors.HttpError as error: | |
exc = ApiRequestError(error) | |
if ( | |
exc.error["code"] != 403 | |
or exc.GetField("reason") != "fileNotDownloadable" | |
): | |
raise exc | |
mimetype = mimetype or "text/plain" | |
fd.seek(0) # just in case `download()` modified `fd` | |
try: | |
download( | |
fd, | |
files.export_media(fileId=file_id, mimeType=mimetype), | |
) | |
except errors.HttpError as error: | |
raise ApiRequestError(error) | |
if mimetype == "text/plain" and remove_bom: | |
fd.seek(0) | |
bom = self._GetBOM(mimetype) | |
if bom: | |
self._RemovePrefix(fd, bom) | |
GoogleDriveFile.GetContentFileResumable = GetContentFileResumable | |
gauth = GoogleAuth() | |
# use any auth method that you like, in my case i use this one | |
# Try to load saved client credentials | |
gauth.LoadCredentialsFile("credentials.json") | |
if gauth.credentials is None: | |
# Authenticate if they're not there | |
# This is what solved the issues: https://stackoverflow.com/questions/24419188/automating-pydrive-verification-process/55876179#55876179 | |
gauth.GetFlow() | |
gauth.flow.params.update({'access_type': 'offline'}) | |
gauth.flow.params.update({'approval_prompt': 'force'}) | |
gauth.LocalWebserverAuth() | |
elif gauth.access_token_expired: | |
# Refresh them if expired | |
gauth.Refresh() | |
else: | |
# Initialize the saved creds | |
gauth.Authorize() | |
# Save the current credentials to a file | |
gauth.SaveCredentialsFile("credentials.json") | |
drive = GoogleDrive(gauth) | |
def update_p(bar, current, total, resume_pos=0): | |
if resume_pos: | |
current += resume_pos * total | |
if total: | |
bar.total = total | |
bar.update(current - bar.n) | |
def get_single_file(gdrive_file, output, **kwargs): | |
size = gdrive_file["fileSize"] | |
no_progress_bar=False | |
progress_desc="" | |
to_file = LOCAL_PATH.joinpath(output) | |
to_file.parent.mkdir(exist_ok=True, parents=True) | |
resume_byte_pos = 0 | |
resume_pos = 0 | |
if to_file.exists(): | |
resume_byte_pos = to_file.stat().st_size | |
resume_pos = (int(size) and int(resume_byte_pos / int(size))) or 0 | |
with Tqdm( | |
desc=progress_desc, | |
disable=no_progress_bar, | |
# explicit `bar_format` as `total` will be set by `update_to` | |
bar_format=BAR_FMT_DEFAULT, | |
**kwargs | |
) as pbar: | |
if size: | |
gdrive_file.GetContentFileResumable(to_file, callback=lambda current, total: update_p(pbar, current, total, resume_pos), \ | |
resume_byte_pos=resume_byte_pos, chunksize=DEFAULT_CHUNK_SIZE) | |
else: | |
# PyDrive doesn't like downloading empty files | |
# https://github.com/iterative/dvc/issues/4286 | |
with open(to_file, "w"): | |
pass | |
def list_files_recursive(drive, current_id, parent_folder=None): | |
"""Recursively lists files and subfolders in a given Google Drive folder. | |
Args: | |
drive: A GoogleDrive instance. | |
current_id: The ID of the current folder. | |
Yields: | |
Tuples of (file_id, file_title, file_path) for each file. | |
""" | |
file_list = drive.ListFile({'q': "'{}' in parents".format(current_id)}).GetList() | |
folder_info = drive.CreateFile({'id': current_id}) #.GetContentString() | |
folder_info.FetchMetadata(fields="title") | |
current_folder = folder_info['title'] | |
for file in sorted(file_list, key = lambda k: (k['mimeType'], k['title'])): | |
if file['mimeType'] == 'application/vnd.google-apps.folder': | |
yield from list_files_recursive(drive, file['id'], current_folder) | |
else: | |
file_path = file['title'] | |
if current_id != 'root': | |
file_path = f"{current_folder}/{file_path}" | |
if parent_folder: | |
file_path = f"{parent_folder}/{file_path}" | |
yield file, file_path | |
if mode: | |
for gdrive_file, file_path in list_files_recursive(drive, DOWNLOAD_ID): | |
print('Downloading {} from Drive'.format(gdrive_file['title'])) | |
get_single_file(gdrive_file, file_path, **kwargs) | |
else: | |
# don't mind. it does not create a file on the remote | |
gdrive_file = drive.CreateFile({"id": DOWNLOAD_ID}) | |
gdrive_file.FetchMetadata(fields="title,mimeType,modifiedDate,description,md5Checksum,fileSize") | |
print('Downloading {} from Drive'.format(gdrive_file['title'])) | |
get_single_file(gdrive_file, gdrive_file['title'], **kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment