Skip to content

Instantly share code, notes, and snippets.

@muleyprasad
Last active March 15, 2023 13:39
Show Gist options
  • Save muleyprasad/99dfaa57102bf9a02d5961f8f4f7c31f to your computer and use it in GitHub Desktop.
Save muleyprasad/99dfaa57102bf9a02d5961f8f4f7c31f to your computer and use it in GitHub Desktop.
Upload, download & delete file and folders from google drive using a service account.
import os
import io
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from urllib.request import urlretrieve
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
# Export formats for different file types
export_formats = {
'application/vnd.google-apps.document': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'application/vnd.google-apps.spreadsheet': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
'application/vnd.google-apps.presentation': 'application/vnd.openxmlformats-officedocument.presentationml.presentation'
}
# Download file function
# def download_file(file_id):
# try:
# file = drive_service.files().get(fileId=file_id).execute()
# if file['mimeType'].split('/')[0] in ['image', 'video']:
# request = drive_service.files().get_media(fileId=file_id)
# file_bytes = io.BytesIO(request.execute())
# return file_bytes
# elif file['mimeType'] in export_formats:
# request = drive_service.files().export_media(fileId=file_id, mimeType=export_formats[file['mimeType']])
# file_bytes = io.BytesIO(request.execute())
# return file_bytes
# else:
# raise Exception(f"No download URL or export format available for {file['name']}")
# except HttpError as error:
# raise Exception(f"An error occurred: {error}")
def download_file(file_id):
file = drive_service.files().get(fileId=file_id).execute()
if file['mimeType'].startswith('image/') or file['mimeType'].startswith('video/'):
request = drive_service.files().get_media(fileId=file_id)
file_bytes = io.BytesIO()
downloader = MediaIoBaseDownload(file_bytes, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f'Download {int(status.progress() * 100)}.')
file_bytes.seek(0)
return file_bytes
elif file['mimeType'] == 'application/vnd.google-apps.document':
request = drive_service.files().export_media(fileId=file_id, mimeType='application/vnd.openxmlformats-officedocument.wordprocessingml.document')
file_bytes = io.BytesIO()
downloader = MediaIoBaseDownload(file_bytes, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f'Download {int(status.progress() * 100)}.')
file_bytes.seek(0)
return file_bytes
elif file['mimeType'] == 'application/vnd.google-apps.spreadsheet':
request = drive_service.files().export_media(fileId=file_id, mimeType='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
file_bytes = io.BytesIO()
downloader = MediaIoBaseDownload(file_bytes, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f'Download {int(status.progress() * 100)}.')
file_bytes.seek(0)
return file_bytes
elif file['mimeType'] == 'application/vnd.google-apps.presentation':
request = drive_service.files().export_media(fileId=file_id, mimeType='application/vnd.openxmlformats-officedocument.presentationml.presentation')
file_bytes = io.BytesIO()
downloader = MediaIoBaseDownload(file_bytes, request)
done = False
while done is False:
status, done = downloader.next_chunk()
print(f'Download {int(status.progress() * 100)}.')
file_bytes.seek(0)
return file_bytes
else:
print('Error: No download URL or export format available')
# Replace with the ID of the Google Drive folder you want to download files from
FOLDER_ID = 'YOUR_FOLDER_ID_HERE'
# Replace with the path to your local folder where you want to save the files
LOCAL_FOLDER_PATH = '.'
# Replace with the path to your service account credentials JSON file
SERVICE_ACCOUNT_FILE = 'service_account.json'
# Set up the Google Drive API client
creds = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=['https://www.googleapis.com/auth/drive'])
drive_service = build('drive', 'v3', credentials=creds)
# Get a list of all files in the folder
# Get a list of all files and folders in the folder
folder_query = f"'{FOLDER_ID}' in parents and trashed = false"
# Set page size to a large number to retrieve all files in one request
page_size = 100
# Create a request to list all files in the folder with the specified fields and page size
request = drive_service.files().list(q=folder_query, fields='nextPageToken, files(id, name, mimeType)', pageSize=page_size)
# Initialize list to store all files
results = []
# Use pagination to retrieve all pages of results
while request is not None:
response = request.execute()
files = response.get('files', [])
results.extend(files)
request = drive_service.files().list_next(request, response)
print(f"Total {len(results)} files found on the drive folder ..")
cnt = 0
skipped = 0
# Loop through the files and download them if they don't already exist
for file in results:
file_id = file['id']
file_name = file['name']
local_file_path = os.path.join(LOCAL_FOLDER_PATH, file_name)
if os.path.exists(local_file_path):
# print(f"Skipping {file_name} - file already exists locally")
skipped += 1
else:
try:
print(f"Downloading {file_name}...")
file_bytes = download_file(file_id)
if file_bytes:
with open(local_file_path, 'wb') as f:
f.write(file_bytes.getbuffer())
except error:
print(f"An error occurred: {error}")
print(f"Total {len(results)} files found on the drive folder. {cnt} downloaded and {skipped} skipped")
# A script to delete all contents of google drive folder and also empty the trash
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
def get_drive_service():
credentials = Credentials.from_service_account_file('service_account.json')
service = build('drive', 'v3', credentials=credentials)
return service
def empty_trash():
service = get_drive_service()
# Call the files().emptyTrash() method to delete all files in the trash
service.files().emptyTrash().execute()
print('Trash emptied.')
def delete_all_files_and_folders_in_folder(folder_id):
service = get_drive_service()
# Get a list of all files and folders in the folder
folder_query = f"'{folder_id}' in parents and trashed = false"
#results = service.files().list(q=query, fields="nextPageToken, files(id, name, mimeType)").execute()
#items = results.get("files", [])
# Set page size to a large number to retrieve all files in one request
page_size = 100
# Create a request to list all files in the folder with the specified fields and page size
request = service.files().list(q=folder_query, fields='nextPageToken, files(id, name, mimeType)', pageSize=page_size)
# Initialize list to store all files
items = []
# Use pagination to retrieve all pages of results
while request is not None:
response = request.execute()
files = response.get('files', [])
items.extend(files)
request = service.files().list_next(request, response)
if not items:
print("No files or folders found.")
return
print("The following files and folders will be deleted:")
for item in items:
print(f'{item["name"]} ({item["id"]})')
confirm = input(f"Are you sure you want to delete total {len(items)} files and folders? (y/n) ")
if confirm.lower() != 'y':
print("Operation canceled.")
return
# Delete all the files and folders
print(f"Deleting {len(items)} items in folder {folder_id}")
for item in items:
item_id = item["id"]
item_name = item["name"]
item_type = item["mimeType"]
try:
service.files().delete(fileId=item_id).execute()
print(f"Deleted {item_name} ({item_type})")
except HttpError as error:
print(f"Error deleting {item_name} ({item_type}): {error}")
print("All files and folders have been deleted.")
if __name__ == '__main__':
folder_id = 'your_folder_id_here' #'your_folder_id_here'
delete_all_files_and_folders_in_folder(folder_id)
empty_trash()
# A script to upload all files from a local folder and subfolders in it to a google drive folder.
# The script skips the file if another file with same name exists on drive, so in a way its resumable.
import os
import json
import time
from google.oauth2.service_account import Credentials
from googleapiclient.errors import HttpError
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
# Set the folder ID for the folder you want to upload to. You can find the folder ID in
# the URL when you're viewing the folder in your Google Drive.
# Gotcha here is you need to share this folder as Editor with service account so it can
# access it.
FOLDER_ID = 'your_folder_id_here'
# Set the chunksize for resumable uploads (in bytes).
CHUNKSIZE = 10 * 1024 * 1024
# Load the service account credentials from a JSON file.
creds = Credentials.from_service_account_file('service_account.json')
# Create the Drive API client.
drive_service = build('drive', 'v3', credentials=creds)
# Verify the connection and print free space left.
about = drive_service.about().get(fields='user,storageQuota').execute()
free_space = int(about['storageQuota']['limit']) - int(about['storageQuota']['usage'])
print(f"Connected to Google Drive as {about['user']['emailAddress']}.")
print(f"You have {free_space // (1024*1024)} MB of free space left.")
# Define a function to upload a file to Google Drive.
def upload_file(file_path):
file_name = os.path.basename(file_path)
media = MediaFileUpload(file_path, chunksize=CHUNKSIZE, resumable=True)
file_metadata = {'name': file_name, 'parents': [FOLDER_ID]}
request = drive_service.files().create(media_body=media, body=file_metadata, fields='id')
response = None
while response is None:
status, response = request.next_chunk()
if status:
percent_complete = int(status.progress() * 100)
print(f"Uploading {file_name}: {percent_complete}% complete.")
print(f"Upload of {file_name} complete. File ID: {response.get('id')}")
# Define a function to recursively find all files in a directory and its subdirectories.
def find_files(directory, parent_folders=None):
print(f"Searching files in {directory}")
for root, dirs, files in os.walk(directory):
if parent_folders is None:
parent_folders = []
parent_folders.append(os.path.basename(root))
for file in files:
yield os.path.join(root, file), parent_folders
# Get a list of files already in the target folder.
folder_query = f"'{FOLDER_ID}' in parents and trashed = false"
# Set page size to a large number to retrieve all files in one request
page_size = 100
# Create a request to list all files in the folder with the specified fields and page size
request = drive_service.files().list(q=folder_query, fields='nextPageToken, files(id, name)', pageSize=page_size)
# Initialize list to store all files
existing_files = []
# Use pagination to retrieve all pages of results
while request is not None:
response = request.execute()
files = {f['name']: f['id'] for f in response.get('files', [])} # response.get('files', [])
existing_files.extend(files)
request = drive_service.files().list_next(request, response)
# Print the total number of files retrieved
print('Total number of files retrieved: %d' % len(existing_files))
# print(existing_files)
cnt = 0
duplicate = 0
# Loop over all files in the current directory and its subdirectories, uploading them to Google Drive.
# Change the path to your intended directory
for file_path, parent_folders in find_files('./'):
if os.path.basename(file_path) in existing_files:
duplicate += 1
# print(f"{file_path} already exists in Google Drive.")
else:
upload_file(file_path)
existing_files.append(os.path.basename(file_path))
cnt += 1
print('Total number of files uploaded: %d' % cnt)
print('Files skipped uploading as they already existed: %d' % duplicate)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment