Created
April 22, 2023 07:43
-
-
Save abhishekmishragithub/79083d3469f8174f872cfee6a6291d5b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pandas as pd | |
| from pathlib import Path | |
| from prefect import flow, task | |
| from prefect_gcp.cloud_storage import GcsBucket | |
| from prefect_gcp import GcpCredentials | |
| @task(retries=3) | |
| def extract_from_gcs(color: str, year: int, month: int) -> Path: | |
| """Download trip data from GCS""" | |
| gcs_path = f'data/{color}/{color}_tripdata_{year}-{month:02}.parquet' | |
| gcs_block = GcsBucket.load('de-gcs') | |
| gcs_block.get_directory(from_path=gcs_path, local_path='./') | |
| return Path(gcs_path) | |
| @task() | |
| def transform(path: Path) -> pd.DataFrame: | |
| """Data cleaning example""" | |
| df = pd.read_parquet(path) | |
| print(f"pre: missing passenger count: {df['passenger_count'].isna().sum()}") | |
| df['passenger_count'].fillna(0, inplace=True) | |
| print(f"post: missing passenger count: {df['passenger_count'].isna().sum()}") | |
| return df | |
| @task() | |
| def write_bq(df: pd.DataFrame) -> None: | |
| """Write DataFrame to BigQuery""" | |
| gcp_credentials_block = GcpCredentials.load('gcp-credentials') | |
| df.to_gbq( | |
| destination_table='trips_data_all.yellow_taxi_trips', | |
| project_id='crafty-elf-376416', | |
| credentials=gcp_credentials_block.get_credentials_from_service_account(), | |
| chunksize=500_000, | |
| if_exists='append' | |
| ) | |
| @flow() | |
| def etl_gcs_to_bq(): | |
| """Main ETL flow to load data into BigQuery""" | |
| color = 'yellow' | |
| year = 2021 | |
| month = 1 | |
| path = extract_from_gcs(color, year, month) | |
| df = transform(path) | |
| write_bq(df) | |
| if __name__ == '__main__': | |
| etl_gcs_to_bq() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment