Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save jacopotagliabue/0c939690b282ed4cda4f78805f6e7d8d to your computer and use it in GitHub Desktop.

Select an option

Save jacopotagliabue/0c939690b282ed4cda4f78805f6e7d8d to your computer and use it in GitHub Desktop.
agentic_ETL_code_on_bauplan
import bauplan
import boto3
from datetime import datetime
def main():
try:
# Initialize clients
bpln = bauplan.Client()
username = bpln.info().user.username
# List parquet files
s3 = boto3.client('s3')
bucket = 'xxx'
files = s3.list_objects_v2(Bucket=bucket, Prefix='')['Contents']
parquet_files = [f['Key'] for f in files if f['Key'].endswith('.parquet')]
# Create branch
branch_name = f"{username}.etl_{int(datetime.now().timestamp())}"
bpln.create_branch(branch=branch_name, from_ref='main')
# Process each file
for file in parquet_files:
table_name = file.split('/')[-1].replace('.parquet', '')
s3_uri = f's3://{bucket}/{file}'
# Create table
bpln.create_table(
table=table_name,
search_uri=s3_uri,
branch=branch_name,
replace=True
)
# Import data
import_state = bpln.import_data(
table=table_name,
search_uri=s3_uri,
branch=branch_name
)
if import_state.error:
raise Exception(f"Import failed for {table_name}")
# Check for ID column
table = bpln.get_table(table=table_name, ref=branch_name)
id_cols = [c.name for c in table.fields if c.name.lower() == 'id']
if id_cols:
# Verify uniqueness
res = bpln.query(
query=f"SELECT COUNT(DISTINCT {id_cols[0]}) = COUNT({id_cols[0]}) as is_unique FROM {table_name}",
ref=branch_name
).to_pandas()
if not res.iloc[0]['is_unique']:
raise Exception(f"ID column not unique in {table_name}")
# Merge to main if all checks pass
bpln.merge_branch(source_ref=branch_name, into_branch='main')
bpln.delete_branch(branch_name)
return branch_name
except Exception as e:
print(f"ETL failed: {str(e)}")
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment