Skip to content

Instantly share code, notes, and snippets.

View anna-anisienia's full-sized avatar

Anna Geller anna-anisienia

View GitHub Profile
@anna-anisienia
anna-anisienia / basic-prefect-etl-flow.py
Created August 24, 2020 20:35
Basic ETL pipeline with Prefect
from prefect import Flow, task
import pandas as pd
def score_check(grade, subject, student):
"""
This is a normal "business logic" function which is not a Prefect task.
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL.
:param grade: number of points on an exam
:param subject: school subject
@anna-anisienia
anna-anisienia / basic-prefect-etl-flow.py
Created August 24, 2020 22:16
Basic ETL with Prefect using Docker storage
from prefect.environments.storage import Docker
from prefect import Flow, task
import pandas as pd
def score_check(grade, subject, student):
"""
This is a normal "business logic" function which is not a Prefect task.
If a student achieved a score > 90, multiply it by 2 for their effort! But only if the subject is not NULL.
:param grade: number of points on an exam
@anna-anisienia
anna-anisienia / dask-k8.py
Created August 25, 2020 01:59
Test Dask setup with Prefect and AWS EKS on Fargate
from prefect.environments.storage import Docker
from prefect.environments import DaskKubernetesEnvironment
from prefect import task, Flow
import random
from time import sleep
@task
def inc(x):
sleep(random.random() / 10)
from airflow.models import DagModel, TaskInstance, DagRun
from airflow.utils.decorators import apply_defaults
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.db import provide_session
class WaitForCompletion(BaseSensorOperator):
"""
Waits for a different DAG or a task in a different DAG to complete
from airflow import DAG
from lib.operators.trigger_and_wait_for_completion import trigger_dag
with DAG(dag_id='business_logic',
default_args=None, # None only for demonstration purposes - this shouldn't be None
schedule_interval=None) as dag:
staging_area = trigger_dag(task_id='staging_area',
trigger_dag_id='staging_area',
wait_for_task='finish')
bus_logic_child_dag, wait_for_bus_logic = trigger_dag(task_id='business_logic_layer',
""" Trigger child Flows from this parent Flow """
from prefect.tasks.prefect.flow_run import FlowRunTask
from prefect import Flow
with Flow("FlowRunTask_Example") as flow:
staging_area = FlowRunTask(flow_name='staging_area',
project_name="InterFlow_Dependencies",
wait=True)
business_logic_layer = FlowRunTask(flow_name='business_logic_layer',
from prefect import task, Flow
@task(log_stdout=True)
def stage_1():
print("Executing stage_1")
@task(log_stdout=True)
def stage_2():
from prefect import task, Flow
@task(log_stdout=True)
def bus_logic_etl_1():
print("Executing bus_logic_etl_1")
@task(log_stdout=True)
def bus_logic_etl_2():
from prefect import task, Flow
@task(log_stdout=True)
def data_mart_1():
print("Executing data_mart_1")
@task(log_stdout=True)
def data_mart_2():
""" Trigger child Flows from this parent Flow """
from prefect.tasks.prefect.flow_run import FlowRunTask
from prefect import Flow
with Flow("MasterFlow") as flow:
staging_area = FlowRunTask(flow_name='staging_area',
project_name="InterFlow_Dependencies",
wait=True)
business_logic_layer = FlowRunTask(flow_name='business_logic_layer',