Last active
March 15, 2023 13:39
-
-
Save muleyprasad/99dfaa57102bf9a02d5961f8f4f7c31f to your computer and use it in GitHub Desktop.
Upload, download & delete file and folders from google drive using a service account.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import io | |
from google.oauth2 import service_account | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
from urllib.request import urlretrieve | |
import ssl | |
ssl._create_default_https_context = ssl._create_unverified_context | |
# Export formats for different file types | |
export_formats = { | |
'application/vnd.google-apps.document': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', | |
'application/vnd.google-apps.spreadsheet': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', | |
'application/vnd.google-apps.presentation': 'application/vnd.openxmlformats-officedocument.presentationml.presentation' | |
} | |
# Download file function | |
# def download_file(file_id): | |
# try: | |
# file = drive_service.files().get(fileId=file_id).execute() | |
# if file['mimeType'].split('/')[0] in ['image', 'video']: | |
# request = drive_service.files().get_media(fileId=file_id) | |
# file_bytes = io.BytesIO(request.execute()) | |
# return file_bytes | |
# elif file['mimeType'] in export_formats: | |
# request = drive_service.files().export_media(fileId=file_id, mimeType=export_formats[file['mimeType']]) | |
# file_bytes = io.BytesIO(request.execute()) | |
# return file_bytes | |
# else: | |
# raise Exception(f"No download URL or export format available for {file['name']}") | |
# except HttpError as error: | |
# raise Exception(f"An error occurred: {error}") | |
def download_file(file_id): | |
file = drive_service.files().get(fileId=file_id).execute() | |
if file['mimeType'].startswith('image/') or file['mimeType'].startswith('video/'): | |
request = drive_service.files().get_media(fileId=file_id) | |
file_bytes = io.BytesIO() | |
downloader = MediaIoBaseDownload(file_bytes, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
print(f'Download {int(status.progress() * 100)}.') | |
file_bytes.seek(0) | |
return file_bytes | |
elif file['mimeType'] == 'application/vnd.google-apps.document': | |
request = drive_service.files().export_media(fileId=file_id, mimeType='application/vnd.openxmlformats-officedocument.wordprocessingml.document') | |
file_bytes = io.BytesIO() | |
downloader = MediaIoBaseDownload(file_bytes, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
print(f'Download {int(status.progress() * 100)}.') | |
file_bytes.seek(0) | |
return file_bytes | |
elif file['mimeType'] == 'application/vnd.google-apps.spreadsheet': | |
request = drive_service.files().export_media(fileId=file_id, mimeType='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') | |
file_bytes = io.BytesIO() | |
downloader = MediaIoBaseDownload(file_bytes, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
print(f'Download {int(status.progress() * 100)}.') | |
file_bytes.seek(0) | |
return file_bytes | |
elif file['mimeType'] == 'application/vnd.google-apps.presentation': | |
request = drive_service.files().export_media(fileId=file_id, mimeType='application/vnd.openxmlformats-officedocument.presentationml.presentation') | |
file_bytes = io.BytesIO() | |
downloader = MediaIoBaseDownload(file_bytes, request) | |
done = False | |
while done is False: | |
status, done = downloader.next_chunk() | |
print(f'Download {int(status.progress() * 100)}.') | |
file_bytes.seek(0) | |
return file_bytes | |
else: | |
print('Error: No download URL or export format available') | |
# Replace with the ID of the Google Drive folder you want to download files from | |
FOLDER_ID = 'YOUR_FOLDER_ID_HERE' | |
# Replace with the path to your local folder where you want to save the files | |
LOCAL_FOLDER_PATH = '.' | |
# Replace with the path to your service account credentials JSON file | |
SERVICE_ACCOUNT_FILE = 'service_account.json' | |
# Set up the Google Drive API client | |
creds = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=['https://www.googleapis.com/auth/drive']) | |
drive_service = build('drive', 'v3', credentials=creds) | |
# Get a list of all files in the folder | |
# Get a list of all files and folders in the folder | |
folder_query = f"'{FOLDER_ID}' in parents and trashed = false" | |
# Set page size to a large number to retrieve all files in one request | |
page_size = 100 | |
# Create a request to list all files in the folder with the specified fields and page size | |
request = drive_service.files().list(q=folder_query, fields='nextPageToken, files(id, name, mimeType)', pageSize=page_size) | |
# Initialize list to store all files | |
results = [] | |
# Use pagination to retrieve all pages of results | |
while request is not None: | |
response = request.execute() | |
files = response.get('files', []) | |
results.extend(files) | |
request = drive_service.files().list_next(request, response) | |
print(f"Total {len(results)} files found on the drive folder ..") | |
cnt = 0 | |
skipped = 0 | |
# Loop through the files and download them if they don't already exist | |
for file in results: | |
file_id = file['id'] | |
file_name = file['name'] | |
local_file_path = os.path.join(LOCAL_FOLDER_PATH, file_name) | |
if os.path.exists(local_file_path): | |
# print(f"Skipping {file_name} - file already exists locally") | |
skipped += 1 | |
else: | |
try: | |
print(f"Downloading {file_name}...") | |
file_bytes = download_file(file_id) | |
if file_bytes: | |
with open(local_file_path, 'wb') as f: | |
f.write(file_bytes.getbuffer()) | |
except error: | |
print(f"An error occurred: {error}") | |
print(f"Total {len(results)} files found on the drive folder. {cnt} downloaded and {skipped} skipped") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A script to delete all contents of google drive folder and also empty the trash | |
from google.oauth2.service_account import Credentials | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
def get_drive_service(): | |
credentials = Credentials.from_service_account_file('service_account.json') | |
service = build('drive', 'v3', credentials=credentials) | |
return service | |
def empty_trash(): | |
service = get_drive_service() | |
# Call the files().emptyTrash() method to delete all files in the trash | |
service.files().emptyTrash().execute() | |
print('Trash emptied.') | |
def delete_all_files_and_folders_in_folder(folder_id): | |
service = get_drive_service() | |
# Get a list of all files and folders in the folder | |
folder_query = f"'{folder_id}' in parents and trashed = false" | |
#results = service.files().list(q=query, fields="nextPageToken, files(id, name, mimeType)").execute() | |
#items = results.get("files", []) | |
# Set page size to a large number to retrieve all files in one request | |
page_size = 100 | |
# Create a request to list all files in the folder with the specified fields and page size | |
request = service.files().list(q=folder_query, fields='nextPageToken, files(id, name, mimeType)', pageSize=page_size) | |
# Initialize list to store all files | |
items = [] | |
# Use pagination to retrieve all pages of results | |
while request is not None: | |
response = request.execute() | |
files = response.get('files', []) | |
items.extend(files) | |
request = service.files().list_next(request, response) | |
if not items: | |
print("No files or folders found.") | |
return | |
print("The following files and folders will be deleted:") | |
for item in items: | |
print(f'{item["name"]} ({item["id"]})') | |
confirm = input(f"Are you sure you want to delete total {len(items)} files and folders? (y/n) ") | |
if confirm.lower() != 'y': | |
print("Operation canceled.") | |
return | |
# Delete all the files and folders | |
print(f"Deleting {len(items)} items in folder {folder_id}") | |
for item in items: | |
item_id = item["id"] | |
item_name = item["name"] | |
item_type = item["mimeType"] | |
try: | |
service.files().delete(fileId=item_id).execute() | |
print(f"Deleted {item_name} ({item_type})") | |
except HttpError as error: | |
print(f"Error deleting {item_name} ({item_type}): {error}") | |
print("All files and folders have been deleted.") | |
if __name__ == '__main__': | |
folder_id = 'your_folder_id_here' #'your_folder_id_here' | |
delete_all_files_and_folders_in_folder(folder_id) | |
empty_trash() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# A script to upload all files from a local folder and subfolders in it to a google drive folder. | |
# The script skips the file if another file with same name exists on drive, so in a way its resumable. | |
import os | |
import json | |
import time | |
from google.oauth2.service_account import Credentials | |
from googleapiclient.errors import HttpError | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaFileUpload | |
# Set the folder ID for the folder you want to upload to. You can find the folder ID in | |
# the URL when you're viewing the folder in your Google Drive. | |
# Gotcha here is you need to share this folder as Editor with service account so it can | |
# access it. | |
FOLDER_ID = 'your_folder_id_here' | |
# Set the chunksize for resumable uploads (in bytes). | |
CHUNKSIZE = 10 * 1024 * 1024 | |
# Load the service account credentials from a JSON file. | |
creds = Credentials.from_service_account_file('service_account.json') | |
# Create the Drive API client. | |
drive_service = build('drive', 'v3', credentials=creds) | |
# Verify the connection and print free space left. | |
about = drive_service.about().get(fields='user,storageQuota').execute() | |
free_space = int(about['storageQuota']['limit']) - int(about['storageQuota']['usage']) | |
print(f"Connected to Google Drive as {about['user']['emailAddress']}.") | |
print(f"You have {free_space // (1024*1024)} MB of free space left.") | |
# Define a function to upload a file to Google Drive. | |
def upload_file(file_path): | |
file_name = os.path.basename(file_path) | |
media = MediaFileUpload(file_path, chunksize=CHUNKSIZE, resumable=True) | |
file_metadata = {'name': file_name, 'parents': [FOLDER_ID]} | |
request = drive_service.files().create(media_body=media, body=file_metadata, fields='id') | |
response = None | |
while response is None: | |
status, response = request.next_chunk() | |
if status: | |
percent_complete = int(status.progress() * 100) | |
print(f"Uploading {file_name}: {percent_complete}% complete.") | |
print(f"Upload of {file_name} complete. File ID: {response.get('id')}") | |
# Define a function to recursively find all files in a directory and its subdirectories. | |
def find_files(directory, parent_folders=None): | |
print(f"Searching files in {directory}") | |
for root, dirs, files in os.walk(directory): | |
if parent_folders is None: | |
parent_folders = [] | |
parent_folders.append(os.path.basename(root)) | |
for file in files: | |
yield os.path.join(root, file), parent_folders | |
# Get a list of files already in the target folder. | |
folder_query = f"'{FOLDER_ID}' in parents and trashed = false" | |
# Set page size to a large number to retrieve all files in one request | |
page_size = 100 | |
# Create a request to list all files in the folder with the specified fields and page size | |
request = drive_service.files().list(q=folder_query, fields='nextPageToken, files(id, name)', pageSize=page_size) | |
# Initialize list to store all files | |
existing_files = [] | |
# Use pagination to retrieve all pages of results | |
while request is not None: | |
response = request.execute() | |
files = {f['name']: f['id'] for f in response.get('files', [])} # response.get('files', []) | |
existing_files.extend(files) | |
request = drive_service.files().list_next(request, response) | |
# Print the total number of files retrieved | |
print('Total number of files retrieved: %d' % len(existing_files)) | |
# print(existing_files) | |
cnt = 0 | |
duplicate = 0 | |
# Loop over all files in the current directory and its subdirectories, uploading them to Google Drive. | |
# Change the path to your intended directory | |
for file_path, parent_folders in find_files('./'): | |
if os.path.basename(file_path) in existing_files: | |
duplicate += 1 | |
# print(f"{file_path} already exists in Google Drive.") | |
else: | |
upload_file(file_path) | |
existing_files.append(os.path.basename(file_path)) | |
cnt += 1 | |
print('Total number of files uploaded: %d' % cnt) | |
print('Files skipped uploading as they already existed: %d' % duplicate) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment