Skip to content

Instantly share code, notes, and snippets.

@ryandhubbard
Created October 10, 2023 20:36
Show Gist options
  • Save ryandhubbard/a966d3732527e7fdddb3dd29e15169ef to your computer and use it in GitHub Desktop.
Save ryandhubbard/a966d3732527e7fdddb3dd29e15169ef to your computer and use it in GitHub Desktop.
Query Big Query and upload files to s3 bucket
import pandas as pd
import warnings
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.fs import S3FileSystem
warnings.filterwarnings("ignore")
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
r"" # Add location of service account creditional file
)
# Query to pull data from bigquery
query = """
"""
df = pd.read_gbq(query, project_id='big_query_project', dialect='standard', credentials=credentials)
# Convert date column to datetime type
df['date'] = pd.to_datetime(df['event_date'])
# Convert datetime column to string representation
df['event_ts_pretty'] = df['event_ts_pretty'].astype(str)
# Sort DataFrame by date for efficient partitioning
df = df.sort_values('date')
# AWS credentials and S3 bucket info
aws_access_key = ''
aws_secret_key = ''
bucket_name = 'bucket'
prefix = 'folder1/folder2/'
# Initialize S3 client
s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key)
s3_bucket = bucket_name
# s3_folder = 'google_analytics/personal_creations/events' # You can modify this according to your folder structure
s3_folder = prefix
# Add a new column 'event_date' with a date format from the 'event_timestamp' column
df['date'] = pd.to_datetime(df['date']).dt.date
# Initialize S3 client
s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key)
from io import BytesIO
for date in df['date'].unique():
parquet_buffer = BytesIO()
filtered_df = df[df['date'] == date]
if not filtered_df.empty: # Check if the filtered DataFrame is not empty
filtered_df.to_parquet(parquet_buffer, index=False)
parquet_buffer.seek(0) # Reset buffer position
s3_object_key = f'{s3_folder}/date={date}/{date}.parquet'
s3.upload_fileobj(parquet_buffer, s3_bucket, s3_object_key)
print(f"Uploaded {s3_object_key} to S3")
else:
print(f"No data for date {date}, skipping upload")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment