Created
October 10, 2023 20:36
-
-
Save ryandhubbard/a966d3732527e7fdddb3dd29e15169ef to your computer and use it in GitHub Desktop.
Query Big Query and upload files to s3 bucket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import warnings | |
import boto3 | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
from pyarrow.fs import S3FileSystem | |
warnings.filterwarnings("ignore") | |
from google.oauth2 import service_account | |
credentials = service_account.Credentials.from_service_account_file( | |
r"" # Add location of service account creditional file | |
) | |
# Query to pull data from bigquery | |
query = """ | |
""" | |
df = pd.read_gbq(query, project_id='big_query_project', dialect='standard', credentials=credentials) | |
# Convert date column to datetime type | |
df['date'] = pd.to_datetime(df['event_date']) | |
# Convert datetime column to string representation | |
df['event_ts_pretty'] = df['event_ts_pretty'].astype(str) | |
# Sort DataFrame by date for efficient partitioning | |
df = df.sort_values('date') | |
# AWS credentials and S3 bucket info | |
aws_access_key = '' | |
aws_secret_key = '' | |
bucket_name = 'bucket' | |
prefix = 'folder1/folder2/' | |
# Initialize S3 client | |
s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) | |
s3_bucket = bucket_name | |
# s3_folder = 'google_analytics/personal_creations/events' # You can modify this according to your folder structure | |
s3_folder = prefix | |
# Add a new column 'event_date' with a date format from the 'event_timestamp' column | |
df['date'] = pd.to_datetime(df['date']).dt.date | |
# Initialize S3 client | |
s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) | |
from io import BytesIO | |
for date in df['date'].unique(): | |
parquet_buffer = BytesIO() | |
filtered_df = df[df['date'] == date] | |
if not filtered_df.empty: # Check if the filtered DataFrame is not empty | |
filtered_df.to_parquet(parquet_buffer, index=False) | |
parquet_buffer.seek(0) # Reset buffer position | |
s3_object_key = f'{s3_folder}/date={date}/{date}.parquet' | |
s3.upload_fileobj(parquet_buffer, s3_bucket, s3_object_key) | |
print(f"Uploaded {s3_object_key} to S3") | |
else: | |
print(f"No data for date {date}, skipping upload") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment