Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created November 9, 2022 19:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/ae317cee75b9b170adb8f4f78194500c to your computer and use it in GitHub Desktop.
Save anna-geller/ae317cee75b9b170adb8f4f78194500c to your computer and use it in GitHub Desktop.
from prefect import task, flow, get_run_logger
from dataplatform.blocks import Workspace, SnowflakePandas
from typing import Any, Dict
@task
def update_customers_dashboards() -> None:
logger = get_run_logger()
# your logic here - might be clearing cash of your BI tool
logger.info("Customers dashboard extracts updated! 📊")
@task
def update_sales_dashboards() -> None:
logger = get_run_logger()
# your logic here - might be clearing cash of your BI tool
logger.info("Sales dashboard extracts updated! 📊")
@task
def extract_current_kpis() -> Dict[str, Any]:
sql_revenue = "SELECT SUM(REVENUE) as revenue FROM STG_CUSTOMER_CONVERSIONS;"
sql_orders = "SELECT COUNT(ORDER_ID) as nr_orders FROM ORDERS;"
block = SnowflakePandas.load("default")
revenue = block.read_sql(sql_revenue)["revenue"][0]
orders = block.read_sql(sql_orders)["nr_orders"][0]
return dict(revenue=revenue, nr_orders=orders)
@task
def send_kpi_report(kpis: Dict[str, Any]) -> None:
workspace = Workspace.load("default")
for key, val in kpis.items():
workspace.send_alert(f"The current {key} KPI is {val} 🚀")
@task
def reverse_etl(kpis: Dict[str, Any]) -> None:
logger = get_run_logger()
logger.info(kpis) # loads kpis to source systems
@flow
def dashboards():
update_customers_dashboards.submit()
update_sales_dashboards.submit()
kpis = extract_current_kpis.submit()
send_kpi_report.submit(kpis)
reverse_etl.submit(kpis)
if __name__ == "__main__":
dashboards()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment