Skip to content

Instantly share code, notes, and snippets.

Avatar

Anna Geller anna-geller

View GitHub Profile
View cache_it_only_same_flow_run.py
from datetime import timedelta
from prefect.tasks import task_input_hash
from prefect import task, flow
import time
from prefect.context import get_run_context
def cache_within_flow_run(context, parameters):
id_ = get_run_context().flow_run.dict().get('id')
key = f"{id_}-{task_input_hash(context, parameters)}"
View dask_ex_map.py
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
import time
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
View dask_ex.py
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
import time
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
View dask_ex_map.py
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
# Define some tasks for us to run in our flow
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
View dask_ex.py
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
# Define some tasks for us to run in our flow
@task
def extract() -> list:
logger = get_run_logger()
logger.info("extract")
return [1, 2, 3, 4, 5, 6]
View s3_bucket.py
import asyncio
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.s3 import S3Bucket
@flow
async def aws_s3_bucket():
aws_creds = await AwsCredentials.load("default")
s3_bucket = S3Bucket(
View taxi_data-deployment.yaml
###
### A complete description of a Prefect Deployment for flow 'taxi-data'
###
name: yellow
description: null
version: 130746a12ffa6ed31ffe6f21fa338fcc
# The work queue that will handle this deployment's runs
work_queue_name: default
tags: []
parameters:
View main.py
"""
prefect deployment build week_2_dataflow/main.py:taxi_data -n yellow -q default -a
prefect deployment build week_2_dataflow/main.py:taxi_data -n yellow -q default -a --param table_name=green_tripdata
prefect deployment build week_2_dataflow/main.py:parent -n yellow -q default -a
prefect deployment build week_2_dataflow/main.py:parent -n yellow -q default -a --param table_name=green_tripdata
"""
import awswrangler as wr
import pandas as pd
from prefect import task, flow, get_run_logger
from prefect.task_runners import SequentialTaskRunner
View dashboards.py
from prefect import task, flow, get_run_logger
from dataplatform.blocks import Workspace, SnowflakePandas
from typing import Any, Dict
@task
def update_customers_dashboards() -> None:
logger = get_run_logger()
# your logic here - might be clearing cash of your BI tool
logger.info("Customers dashboard extracts updated! 📊")
View ingest_transform.py
from datetime import date
from prefect import flow
from flows.transformation.jaffle_shop.dbt_run_from_manifest import dbt_jaffle_shop
from flows.ingestion.ingest_jaffle_shop import raw_data_jaffle_shop
from flows.analytics.dashboards import dashboards
from flows.ml.sales_forecast import sales_forecast
@flow
def jaffle_shop_ingest_transform(