Skip to content

Instantly share code, notes, and snippets.

@t3ndai
Last active September 27, 2021 12:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save t3ndai/3d79a4432ea22786d9d4c53d3d28a4a1 to your computer and use it in GitHub Desktop.
Save t3ndai/3d79a4432ea22786d9d4c53d3d28a4a1 to your computer and use it in GitHub Desktop.
python etl
if __name__ == "__main__":
import pandas as pd
from english_category_name import *
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
def extract_payments_csv():
payments_DF = pd.read_csv(
'./brazilian-ecommerce/olist_order_payments_dataset.csv')
return payments_DF
def extract_order_items_csv():
order_items_DF = pd.read_csv(
'./brazilian-ecommerce/olist_order_items_dataset.csv')
return order_items_DF
def extract_customers_csv():
customers_DF = pd.read_csv(
'./brazilian-ecommerce/olist_customers_dataset.csv')
return customers_DF
def extract_orders_csv():
orders_DF = pd.read_csv(
'./brazilian-ecommerce/olist_orders_dataset.csv')
return orders_DF
def extract_sellers_csv():
sellers_DF = pd.read_csv(
'./brazilian-ecommerce/olist_sellers_dataset.csv')
return sellers_DF
def extract_products_csv():
products_DF = pd.read_csv(
'./brazilian-ecommerce/olist_products_dataset.csv')
return products_DF
def extract_product_translations():
products_category_translations_DF = pd.read_csv(
'./brazilian-ecommerce/product_category_name_translation.csv',
index_col='product_category_name')
return products_category_translations_DF
def extract_geolocation_csv():
geolocation_DF = pd.read_csv(
'./brazilian-ecommerce/olist_geolocation_dataset.csv')
return geolocation_DF
def extract_marketing_leads_csv():
marketing_leads_DF = pd.read_csv(
'./marketing-funnel-olist/olist_marketing_qualified_leads_dataset.csv')
return marketing_leads_DF
def extract_marketing_closed_deals_csv():
marketing_closed_deals_DF =
pd.read_csv('./marketing-funnel-olist/olist_closed_deals_dataset.csv')
return marketing_closed_deals_DF
def transform_category_names(products_DF, products_category_translations_DF):
products_DF['product_category_name'] = products_DF['product_category_name'].apply(
lambda x: english_category_name(products_category_translations_DF, x))
return products_DF
def transform_orders_dates(orders_DF):
orders_DF['order_delivered_customer_date'] = pd.to_datetime(
orders_DF['order_delivered_customer_date'])
orders_DF['order_purchase_timestamp'] = pd.to_datetime(
orders_DF['order_purchase_timestamp'])
orders_DF['order_approved_at'] = pd.to_datetime(
orders_DF['order_approved_at'])
orders_DF['order_delivered_carrier_date'] = pd.to_datetime(
orders_DF['order_delivered_carrier_date'])
orders_DF['order_estimated_delivery_date'] = pd.to_datetime(
orders_DF['order_estimated_delivery_date'])
return orders_DF
def transform_order_items_payments(orders_DF, payments_DF):
return pd.merge(orders_DF, payments_DF, on='order_id', how='inner')
def transform_order_items_payments_products(order_list_payments, products_DF):
return pd.merge(order_list_payments, products_DF, on='product_id', how='inner')
def transform_orders_customers(orders_payments_products, customers_DF):
return pd.merge(orders_payments_products, customers_DF, how='inner', on='customer_id')
def transform_sellers_marketing_closed(sellers_DF, marketing_closed_deals_DF):
return pd.merge(sellers_DF, marketing_closed_deals_DF, how='left', on='seller_id')
def transform_sellers(orders_customers_products, sellers_marketing):
return pd.merge(orders_customers_products, sellers_marketing, how='inner', on='seller_id')
def main():
# extraction
payments_DF = client.submit(extract_payments_csv)
order_items_DF = client.submit(extract_order_items_csv)
sellers_DF = client.submit(extract_sellers_csv)
orders_DF = client.submit(extract_orders_csv)
products_DF = client.submit(extract_products_csv)
customers_DF = client.submit(extract_customers_csv)
product_category_translations_DF = client.submit(
extract_product_translations)
geolocation_DF = client.submit(extract_geolocation_csv)
marketing_leads_DF = client.submit(extract_marketing_leads_csv)
marketing_closed_deals_DF = client.submit(
extract_marketing_closed_deals_csv)
# transformation
orders_DF_transformed = client.submit(
transform_orders_dates, orders_DF)
products_DF_transformed = client.submit(
transform_category_names, products_DF, product_category_translations_DF)
orders_payments = client.submit(
transform_order_items_payments, orders_DF_transformed, payments_DF)
orders_items_payments = client.submit(
transform_order_items_payments, orders_payments, order_items_DF)
order_items_payments_products = client.submit(
transform_order_items_payments_products, orders_items_payments, products_DF_transformed)
orders_customers_products = client.submit(
transform_orders_customers, order_items_payments_products, customers_DF)
sellers_marketing = client.submit(
transform_sellers_marketing_closed, sellers_DF, marketing_closed_deals_DF)
sellers_customers_orders_products = client.submit(
transform_sellers, sellers_marketing, orders_customers_products)
# load
olist_data_flat = sellers_customers_orders_products.result(
).to_parquet('olist_data_flat', compression='brotli')
main()
from prefect import task, Flow
from dask.distributed import Client, LocalCluster
from prefect.engine.executors import DaskExecutor
import pandas as pd
from english_category_name import *
import dataset
cluster = LocalCluster()
client = Client(cluster)
@task
def extract_payments_csv():
payments_DF = pd.read_csv('./brazilian-ecommerce/olist_order_payments_dataset.csv')
return payments_DF
@task
def extract_order_items_csv():
order_items_DF = pd.read_csv('./brazilian-ecommerce/olist_order_items_dataset.csv')
return order_items_DF
@task
def extract_customers_csv():
customers_DF = pd.read_csv('./brazilian-ecommerce/olist_customers_dataset.csv')
return customers_DF
@task
def extract_orders_csv():
orders_DF = pd.read_csv('./brazilian-ecommerce/olist_orders_dataset.csv')
return orders_DF
@task
def extract_sellers_csv():
sellers_DF = pd.read_csv('./brazilian-ecommerce/olist_sellers_dataset.csv')
return sellers_DF
@task
def extract_products_csv():
products_DF = pd.read_csv('./brazilian-ecommerce/olist_products_dataset.csv')
return products_DF
@task
def extract_product_translations():
products_category_translations_DF = pd.read_csv('./brazilian-ecommerce/product_category_name_translation.csv',
index_col='product_category_name')
return products_category_translations_DF
@task
def extract_geolocation_csv():
geolocation_DF = pd.read_csv('./brazilian-ecommerce/olist_geolocation_dataset.csv')
return geolocation_DF
@task
def extract_marketing_leads_csv():
marketing_leads_DF = pd.read_csv('./marketing-funnel-olist/olist_marketing_qualified_leads_dataset.csv')
return marketing_leads_DF
@task
def extract_marketing_closed_deals_csv():
marketing_closed_deals_DF = pd.read_csv('./marketing-funnel-olist/olist_closed_deals_dataset.csv')
return marketing_closed_deals_DF
@task
def transform_category_names(products_DF, products_category_translations_DF):
products_DF['product_category_name'] = products_DF['product_category_name'].apply(lambda x: english_category_name(products_category_translations_DF, x))
return products_DF
@task
def transform_orders_dates(orders_DF):
orders_DF['order_delivered_customer_date'] = pd.to_datetime(orders_DF['order_delivered_customer_date'])
orders_DF['order_purchase_timestamp'] = pd.to_datetime(orders_DF['order_purchase_timestamp'])
orders_DF['order_approved_at'] = pd.to_datetime(orders_DF['order_approved_at'])
orders_DF['order_delivered_carrier_date'] = pd.to_datetime(orders_DF['order_delivered_carrier_date'])
orders_DF['order_estimated_delivery_date'] = pd.to_datetime(orders_DF['order_estimated_delivery_date'])
return orders_DF
@task
def connect_sql_file():
db = dataset.connect('sqlite://olist_data.db')
return db
@task
def transform_order_items_payments(orders_DF, payments_DF):
return pd.merge(orders_DF, payments_DF, on='order_id', how='inner')
@task
def transform_order_items_payments_products(order_list_payments, products_DF):
return pd.merge(order_list_payments, products_DF, on='product_id', how='inner')
@task
def transform_orders_customers(orders_payments_products, customers_DF):
return pd.merge(orders_payments_products, customers_DF, how='inner', on='customer_id')
@task
def transform_sellers_marketing_closed(sellers_DF, marketing_closed_deals_DF):
return pd.merge(sellers_DF, marketing_closed_deals_DF, how='left', on='seller_id')
@task
def transform_sellers(orders_customers_products, sellers_marketing):
return pd.merge(orders_customers_products, sellers_marketing, how='inner', on='seller_id')
def main():
#setup Flow
with Flow('olist_ETL') as flow:
#extraction
payments_DF = extract_payments_csv()
order_items_DF = extract_order_items_csv()
sellers_DF = extract_sellers_csv()
orders_DF = extract_orders_csv()
products_DF = extract_products_csv()
customers_DF = extract_customers_csv()
product_category_translations_DF = extract_product_translations()
geolocation_DF = extract_geolocation_csv()
marketing_leads_DF = extract_marketing_leads_csv()
marketing_closed_deals_DF = extract_marketing_closed_deals_csv()
#transformation
orders_DF_transformed = transform_orders_dates(orders_DF)
products_DF_transformed = transform_category_names(products_DF, product_category_translations_DF)
orders_payments = transform_order_items_payments(orders_DF_transformed, payments_DF)
orders_items_payments = transform_order_items_payments(orders_payments, order_items_DF)
order_items_payments_products = transform_order_items_payments_products(orders_items_payments, products_DF_transformed)
orders_customers_products = transform_orders_customers(order_items_payments_products, customers_DF)
sellers_marketing = transform_sellers_marketing_closed(sellers_DF, marketing_closed_deals_DF)
sellers_customers_orders_products = transform_sellers(sellers_marketing, orders_customers_products)
#load
olist_data_flat = sellers_customers_orders_products.result.to_parquet('olist_data_flat', compression='brotli')
executor = DaskExecutor(address=client.scheduler.address)
flow.run(executor=executor)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment