Created
April 22, 2023 07:26
-
-
Save abhishekmishragithub/2a95bf52fe5cbae3fb372bc1f6b3df9a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python | |
| # coding: utf-8 | |
| import os | |
| import argparse | |
| from time import time | |
| import pandas as pd | |
| from sqlalchemy import create_engine | |
| from prefect import flow, task | |
| from prefect.tasks import task_input_hash | |
| from datetime import timedelta | |
| from prefect_sqlalchemy import SqlAlchemyConnector | |
| @task(log_prints=True, tags=["extract"], cache_key_fn=task_input_hash, cache_expiration=timedelta(days=1)) | |
| def extract_data(url: str): | |
| # the backup files are gzipped, and it's important to keep the correct extension | |
| # for pandas to be able to open the file | |
| if url.endswith('.csv.gz'): | |
| csv_name = 'yellow_tripdata_2021-01.csv.gz' | |
| else: | |
| csv_name = 'output.csv' | |
| os.system(f"wget {url} -O {csv_name}") | |
| df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000) | |
| df = next(df_iter) | |
| df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime) | |
| df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime) | |
| return df | |
| @task(log_prints=True) | |
| def transform_data(df): | |
| print(f"pre: missing passenger count: {df['passenger_count'].isin([0]).sum()}") | |
| df = df[df['passenger_count'] != 0] | |
| print(f"post: missing passenger count: {df['passenger_count'].isin([0]).sum()}") | |
| return df | |
| @task(log_prints=True, retries=3) | |
| def load_data(table_name, df): | |
| connection_block = SqlAlchemyConnector.load("postgres-connector") | |
| with connection_block.get_connection(begin=False) as engine: | |
| df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace') | |
| df.to_sql(name=table_name, con=engine, if_exists='append') | |
| @flow(name="Subflow", log_prints=True) | |
| def log_subflow(table_name: str): | |
| print(f"Logging Subflow for: {table_name}") | |
| @flow(name="Ingest Data") | |
| def main_flow(table_name: str = "yellow_taxi_trips"): | |
| user = "postgres" | |
| password = "admin" | |
| host = "localhost" | |
| port = "5433" | |
| db = "ny_taxi" | |
| table_name = "yellow_taxi_trips" | |
| csv_url = "https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz" | |
| log_subflow(table_name) | |
| raw_data = extract_data(csv_url) | |
| data = transform_data(raw_data) | |
| load_data(table_name, data) | |
| if __name__ == '__main__': | |
| main_flow(table_name="yellow_trips") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment