-
-
Save kellan/2a8ff0e614ae96034f5592513f9a717a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import io | |
from googleapiclient.discovery import build | |
from googleapiclient.http import MediaIoBaseDownload | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
import boto3 | |
from botocore.exceptions import ClientError | |
# Load environment variables | |
from dotenv import load_dotenv | |
load_dotenv() | |
# Google Drive authentication scopes | |
GOOGLE_SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] | |
def google_authenticate(): | |
"""Authenticates the user and returns the credentials.""" | |
creds = None | |
if os.path.exists('token.json'): | |
creds = Credentials.from_authorized_user_file('token.json', GOOGLE_SCOPES) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file( | |
'client_secrets.json', GOOGLE_SCOPES) | |
creds = flow.run_local_server(port=0) | |
with open('token.json', 'w') as token: | |
token.write(creds.to_json()) | |
return creds | |
def list_gdrive_files_by_folder(folder_id): | |
"""Returns a list of all files in a specific Google Drive folder.""" | |
query = f"'{folder_id}' in parents" | |
page_token = None | |
files = [] | |
# Paginate through files | |
while True: | |
results = gdrive().files().list(q=query, pageSize=1000, fields="nextPageToken, files(id, name, size)", pageToken=page_token).execute() | |
items = results.get('files', []) | |
files.extend(items) | |
page_token = results.get('nextPageToken') | |
if not page_token: | |
break | |
return files | |
def check_s3_file_exists_and_size_matches(bucket_name, file_name, gdrive_file_size): | |
try: | |
# Get S3 object metadata to check if file exists and get its size | |
response = s3().head_object(Bucket=bucket_name, Key=file_name) | |
s3_file_size = response['ContentLength'] | |
# Compare the sizes | |
return s3_file_size == gdrive_file_size | |
except ClientError as e: | |
# If the object does not exist, a 404 error will be raised | |
if e.response['Error']['Code'] == '404': | |
return False | |
else: | |
# Handle other errors | |
raise e | |
def stream_file_from_gdrive_to_s3(file_id, bucket_name, object_name, gdrive_file_size): | |
"""Use chunked downloads from Drive, and multi-part upload to S3 to stream files from one to the other.""" | |
# Check if file already exists in S3 and compare sizes | |
if check_s3_file_exists_and_size_matches(bucket_name, object_name, gdrive_file_size): | |
print(f"File {object_name} already exists in S3 with the same size. Skipping upload.") | |
return | |
else: | |
print(f"File {object_name} does not exist in S3 or sizes differ. Proceeding with upload.") | |
request = gdrive().files().get_media(fileId=file_id) | |
# Start multipart upload to S3 | |
mpu = s3().create_multipart_upload(Bucket=bucket_name, Key=object_name) | |
chunk_size = 256 * 1024 * 1024 # 256 MB chunks | |
#chunk_size = 5 * 1024 * 1024 # 5 MB chunks for debugging | |
part_number = 1 | |
parts = [] | |
file_stream = io.BytesIO() | |
downloader = MediaIoBaseDownload(file_stream, request, chunksize=chunk_size) | |
done = False | |
try: | |
while not done: | |
status, done = downloader.next_chunk() | |
if status: | |
file_stream.seek(0) # Reset stream position to the beginning | |
chunk = file_stream.read() | |
# Upload each chunk as a part in multipart upload | |
part = s3().upload_part( | |
Body=chunk, | |
Bucket=bucket_name, | |
Key=object_name, | |
UploadId=mpu['UploadId'], | |
PartNumber=part_number | |
) | |
# Store part details to complete multipart upload later | |
parts.append({ | |
'PartNumber': part_number, | |
'ETag': part['ETag'] | |
}) | |
# Reset the stream for the next chunk | |
file_stream.seek(0) | |
file_stream.truncate() | |
part_number += 1 | |
print(f"Uploaded part {part_number - 1}, {status.progress() * 100:.2f}% complete") | |
# Complete the multipart upload | |
s3().complete_multipart_upload( | |
Bucket=bucket_name, | |
Key=object_name, | |
UploadId=mpu['UploadId'], | |
MultipartUpload={'Parts': parts} | |
) | |
print(f"Completed upload of {object_name} to {bucket_name}") | |
except Exception as e: | |
print(f"Error during upload: {str(e)}") | |
# Abort multipart upload in case of failure | |
s3().abort_multipart_upload( | |
Bucket=bucket_name, | |
Key=object_name, | |
UploadId=mpu['UploadId'] | |
) | |
print("Aborted multipart upload due to error.") | |
def upload_all_files_from_gdrive_folder_to_s3(folder_id, bucket_name, prefix=''): | |
"""Fetches all files from a Google Drive folder and uploads them one by one to S3.""" | |
files = list_gdrive_files_by_folder(folder_id) | |
if not files: | |
print(f"No files found in the Google Drive folder: {folder_id}") | |
return | |
print(f"Found {len(files)} files in Google Drive folder {folder_id}.") | |
for file in files: | |
file_id = file['id'] | |
file_name = file['name'] | |
file_size = int(file['size']) if 'size' in file else 0 # Handle folders or empty files | |
object_name = f"{prefix}{file_name}" | |
print(f"Processing file: {file_name} (Size: {file_size} bytes)") | |
stream_file_from_gdrive_to_s3(file_id, bucket_name, object_name, file_size) | |
_s3_client = None | |
def s3(): | |
global _s3_client | |
if _s3_client: | |
return _s3_client | |
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") | |
aws_secret_access_key = os.getenv("AWS_ACCESS_KEY_SECRET") | |
aws_region = os.getenv("AWS_REGION") | |
_s3_client = boto3.client( | |
's3', | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
region_name=aws_region | |
) | |
return _s3_client | |
_gdrive_service = None | |
def gdrive(): | |
global _gdrive_service | |
if _gdrive_service: | |
return _gdrive_service | |
# Authenticate to Google Drive | |
creds = google_authenticate() | |
_gdrive_service = build('drive', 'v3', credentials=creds) | |
return _gdrive_service | |
def create_s3_bucket_if_not_exists(bucket_name, region=None): | |
"""Create an S3 bucket if it doesn't already exist.""" | |
try: | |
s3().head_bucket(Bucket=bucket_name) | |
print(f"Bucket '{bucket_name}' already exists.") | |
except ClientError as e: | |
# If the bucket does not exist, create it | |
if e.response['Error']['Code'] == '404': | |
try: | |
if region: | |
s3().create_bucket( | |
Bucket=bucket_name, | |
CreateBucketConfiguration={'LocationConstraint': region} | |
) | |
else: | |
s3().create_bucket(Bucket=bucket_name) | |
print(f"Bucket '{bucket_name}' created.") | |
except ClientError as error: | |
print(f"Error creating bucket: {error}") | |
raise | |
else: | |
print(f"Error checking bucket: {e}") | |
raise | |
if __name__ == '__main__': | |
folder_id = os.getenv("FOLDER_ID") | |
bucket_name = os.getenv("S3_BUCKET_NAME") | |
prefix = os.getenv("S3_PREFIX") or '' | |
# Ensure the S3 bucket exists | |
create_s3_bucket_if_not_exists(bucket_name) | |
# Fetch all files from the Google Drive folder and upload to S3 | |
upload_all_files_from_gdrive_folder_to_s3(folder_id, bucket_name, prefix) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment