Skip to content

Instantly share code, notes, and snippets.

@obsessedcake
Created December 5, 2023 11:12
Show Gist options
  • Save obsessedcake/cc19eb4a3a3f843e4d8a642438955b2d to your computer and use it in GitHub Desktop.
Save obsessedcake/cc19eb4a3a3f843e4d8a642438955b2d to your computer and use it in GitHub Desktop.
2019-11-07
import io
import pickle
import os.path
import traceback
from apiclient.http import MediaIoBaseDownload
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from collections import defaultdict
from pathlib import Path
class GDriveSharedFolderApi:
# If modifying these scopes, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/drive']
''' Initialization '''
def init(self):
creds = self.__load_credentials()
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES)
creds = flow.run_local_server()
self.__safe_credentials(creds)
self.service = build('drive', 'v3', credentials=creds)
def __load_credentials(self):
credentials = None
# The file token.pickle stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists('token.pickle'):
with open('token.pickle', 'rb') as token:
credentials = pickle.load(token)
return credentials
def __safe_credentials(self, credentials):
# Save the credentials for the next run
with open('token.pickle', 'wb') as token:
pickle.dump(credentials, token)
''' API wrappers '''
def get_items(self, driver_or_folder_id, page_token=None, page_size=1000):
results = self.service.files().list(
fields = 'nextPageToken, files(id, name, mimeType, size)',
pageToken = page_token,
pageSize = page_size,
q = f"'{driver_or_folder_id}' in parents"
).execute()
return results.get('files', [])
def get_media(self, file_id):
return self.service.files().get_media(fileId=file_id)
def is_folder(self, mime_type):
return mime_type == 'application/vnd.google-apps.folder'
class Downloader:
def __init__(self, api):
self.api = api
def download(self, output_folder, driver_id):
self.base_folder_len = len(output_folder) + 1 # Plus '/'
output_folder = Path(output_folder)
self.current_folder = output_folder
self.current_folder.mkdir(exist_ok=True, parents=True)
self.__load_data(output_folder)
try:
self.__walkThoughDriver(driver_id)
except:
print(traceback.format_exc())
pass
self.__save_data(output_folder)
''' Helpful data '''
def __load_data(self, output_folder):
file = output_folder / 'file_names.bin'
if file.exists():
with file.open('rb') as f:
self.file_names = pickle.load(f)
else:
self.file_names = []
def __save_data(self, output_folder):
with open( output_folder / 'file_names.bin', 'wb') as f:
pickle.dump(self.file_names, f)
''' Recurcive walker '''
def __walkThoughDriver(self, driver_or_folder_id):
items = self.api.get_items(driver_or_folder_id)
items.sort(key=lambda i: i['name'])
self.__fix_duplicates(items)
for item in items:
id = item['id']
name = item['name']
if self.api.is_folder(item['mimeType']):
self.__handleFolder(id, name)
else:
self.__handleFile(id, name, int(item.get('size', 0))) # Temp hack for doc files.
def __handleFolder(self, folder_id, folder_name):
self.current_folder /= folder_name
try:
# Yes, somehow this shit can throw an exception, ha-ha-ha...
self.current_folder.mkdir(exist_ok=True, parents=True)
except:
pass
self.__walkThoughDriver(folder_id)
self.current_folder = self.current_folder.parent
def __handleFile(self, file_id, file_name, file_size):
real_file_path = self.current_folder / file_name
drive_path = self.__get_drive_path(real_file_path)
if drive_path in self.file_names:
print(f"'{drive_path}' file was skipped because it is already downloaded.")
return
if file_size == 0:
print(f"'{drive_path}' file was skipped because it has 0B size.")
return
try:
self.__downloadFile(real_file_path, drive_path, file_id)
except:
print(f"Failed to download the '{drive_path}' file.")
print(traceback.format_exc())
return
self.file_names.append(drive_path)
''' File downloader '''
def __downloadFileInMemory(self, real_file_path, drive_path, file_id):
target = self.api.get_media(file_id)
fh = io.BytesIO()
downloader = MediaIoBaseDownload(fh, target)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f"Downloaded {int(status.progress() * 100)}% of '{drive_path}' file...\r", end='', flush=True)
print(f"Finished downloading '{drive_path}' file.")
with open(real_file_path, 'wb') as f:
fh.seek(0)
f.write(fh.read())
def __downloadFile(self, real_file_path, drive_path, file_id):
with open(real_file_path, 'wb') as f:
target = self.api.get_media(file_id)
downloader = MediaIoBaseDownload(f, target)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f"Downloaded {int(status.progress() * 100)}% of '{drive_path}' file...\r", end='', flush=True)
print(f"Finished downloading '{drive_path}' file.")
''' Utils '''
def __get_drive_path(self, path):
return str(path)[self.base_folder_len:]
def __fix_duplicates(self, seq):
# Collect all duplicates.
tally = defaultdict(list)
for i, item in enumerate(seq):
tally[item['name']].append(i)
# Fix all found duplicates.
for locs in tally.values():
if len(locs) > 1:
for loc in locs:
item = seq[loc]
item['name'] = f"{item['name']}-{item['id']}"
if __name__ == '__main__':
api = GDriveSharedFolderApi()
api.init()
downloader = Downloader(api)
downloader.download('your_output_folder', 'your_driver_id')
@obsessedcake
Copy link
Author

From these SO threads:

Because there can be files with same names:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment