Skip to content

Instantly share code, notes, and snippets.

@kellan
Last active October 24, 2024 21:36
Show Gist options
  • Save kellan/2a8ff0e614ae96034f5592513f9a717a to your computer and use it in GitHub Desktop.
Save kellan/2a8ff0e614ae96034f5592513f9a717a to your computer and use it in GitHub Desktop.
import os
import io
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
import boto3
from botocore.exceptions import ClientError
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Google Drive authentication scopes
GOOGLE_SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
def google_authenticate():
"""Authenticates the user and returns the credentials."""
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', GOOGLE_SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'client_secrets.json', GOOGLE_SCOPES)
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
def list_gdrive_files_by_folder(folder_id):
"""Returns a list of all files in a specific Google Drive folder."""
query = f"'{folder_id}' in parents"
page_token = None
files = []
# Paginate through files
while True:
results = gdrive().files().list(q=query, pageSize=1000, fields="nextPageToken, files(id, name, size)", pageToken=page_token).execute()
items = results.get('files', [])
files.extend(items)
page_token = results.get('nextPageToken')
if not page_token:
break
return files
def check_s3_file_exists_and_size_matches(bucket_name, file_name, gdrive_file_size):
try:
# Get S3 object metadata to check if file exists and get its size
response = s3().head_object(Bucket=bucket_name, Key=file_name)
s3_file_size = response['ContentLength']
# Compare the sizes
return s3_file_size == gdrive_file_size
except ClientError as e:
# If the object does not exist, a 404 error will be raised
if e.response['Error']['Code'] == '404':
return False
else:
# Handle other errors
raise e
def stream_file_from_gdrive_to_s3(file_id, bucket_name, object_name, gdrive_file_size):
"""Use chunked downloads from Drive, and multi-part upload to S3 to stream files from one to the other."""
# Check if file already exists in S3 and compare sizes
if check_s3_file_exists_and_size_matches(bucket_name, object_name, gdrive_file_size):
print(f"File {object_name} already exists in S3 with the same size. Skipping upload.")
return
else:
print(f"File {object_name} does not exist in S3 or sizes differ. Proceeding with upload.")
request = gdrive().files().get_media(fileId=file_id)
# Start multipart upload to S3
mpu = s3().create_multipart_upload(Bucket=bucket_name, Key=object_name)
chunk_size = 256 * 1024 * 1024 # 256 MB chunks
#chunk_size = 5 * 1024 * 1024 # 5 MB chunks for debugging
part_number = 1
parts = []
file_stream = io.BytesIO()
downloader = MediaIoBaseDownload(file_stream, request, chunksize=chunk_size)
done = False
try:
while not done:
status, done = downloader.next_chunk()
if status:
file_stream.seek(0) # Reset stream position to the beginning
chunk = file_stream.read()
# Upload each chunk as a part in multipart upload
part = s3().upload_part(
Body=chunk,
Bucket=bucket_name,
Key=object_name,
UploadId=mpu['UploadId'],
PartNumber=part_number
)
# Store part details to complete multipart upload later
parts.append({
'PartNumber': part_number,
'ETag': part['ETag']
})
# Reset the stream for the next chunk
file_stream.seek(0)
file_stream.truncate()
part_number += 1
print(f"Uploaded part {part_number - 1}, {status.progress() * 100:.2f}% complete")
# Complete the multipart upload
s3().complete_multipart_upload(
Bucket=bucket_name,
Key=object_name,
UploadId=mpu['UploadId'],
MultipartUpload={'Parts': parts}
)
print(f"Completed upload of {object_name} to {bucket_name}")
except Exception as e:
print(f"Error during upload: {str(e)}")
# Abort multipart upload in case of failure
s3().abort_multipart_upload(
Bucket=bucket_name,
Key=object_name,
UploadId=mpu['UploadId']
)
print("Aborted multipart upload due to error.")
def upload_all_files_from_gdrive_folder_to_s3(folder_id, bucket_name, prefix=''):
"""Fetches all files from a Google Drive folder and uploads them one by one to S3."""
files = list_gdrive_files_by_folder(folder_id)
if not files:
print(f"No files found in the Google Drive folder: {folder_id}")
return
print(f"Found {len(files)} files in Google Drive folder {folder_id}.")
for file in files:
file_id = file['id']
file_name = file['name']
file_size = int(file['size']) if 'size' in file else 0 # Handle folders or empty files
object_name = f"{prefix}{file_name}"
print(f"Processing file: {file_name} (Size: {file_size} bytes)")
stream_file_from_gdrive_to_s3(file_id, bucket_name, object_name, file_size)
_s3_client = None
def s3():
global _s3_client
if _s3_client:
return _s3_client
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_ACCESS_KEY_SECRET")
aws_region = os.getenv("AWS_REGION")
_s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region
)
return _s3_client
_gdrive_service = None
def gdrive():
global _gdrive_service
if _gdrive_service:
return _gdrive_service
# Authenticate to Google Drive
creds = google_authenticate()
_gdrive_service = build('drive', 'v3', credentials=creds)
return _gdrive_service
def create_s3_bucket_if_not_exists(bucket_name, region=None):
"""Create an S3 bucket if it doesn't already exist."""
try:
s3().head_bucket(Bucket=bucket_name)
print(f"Bucket '{bucket_name}' already exists.")
except ClientError as e:
# If the bucket does not exist, create it
if e.response['Error']['Code'] == '404':
try:
if region:
s3().create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region}
)
else:
s3().create_bucket(Bucket=bucket_name)
print(f"Bucket '{bucket_name}' created.")
except ClientError as error:
print(f"Error creating bucket: {error}")
raise
else:
print(f"Error checking bucket: {e}")
raise
if __name__ == '__main__':
folder_id = os.getenv("FOLDER_ID")
bucket_name = os.getenv("S3_BUCKET_NAME")
prefix = os.getenv("S3_PREFIX") or ''
# Ensure the S3 bucket exists
create_s3_bucket_if_not_exists(bucket_name)
# Fetch all files from the Google Drive folder and upload to S3
upload_all_files_from_gdrive_folder_to_s3(folder_id, bucket_name, prefix)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment