This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import awswrangler as wr | |
from datetime import date | |
import logging | |
import pandas as pd | |
from pandas_datareader.data import DataReader | |
class StockDataETLRunner: | |
def __init__( | |
self, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# search replace AWS_ACCOUNT_ID with your AWS account ID and adjust the variables below (line 3-7), especially your API key | |
# if your flow needs access to other AWS resources other than S3, add those in the task role policy: line 96-108 | |
export AWS_REGION=us-east-1 | |
export ECS_CLUSTER_NAME=prefectEcsCluster | |
export ECS_LOG_GROUP_NAME=/ecs/prefectEcsAgent | |
export ECS_SERVICE_NAME=prefectECSAgent | |
export PREFECT_API_KEY=your_Prefect_Cloud_API_key | |
export AWS_PAGER="" | |
aws ssm put-parameter --type SecureString --name PREFECT__CLOUD__API_KEY --value $PREFECT_API_KEY --region $AWS_REGION --overwrite |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import prefect | |
from prefect.storage import S3 | |
from prefect.run_configs import ECSRun | |
from prefect import task, Flow | |
FLOW_NAME = "ecs_demo" | |
STORAGE = S3( | |
bucket="prefect-datasets", | |
key=f"flows/{FLOW_NAME}.py", | |
stored_as_script=True, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import Flow, Parameter, task | |
@task(log_stdout=True) | |
def test_parameter_value(user_input: str): | |
print(f"Got parameter value: {user_input}") | |
with Flow('child') as flow: | |
num_days_parameter = Parameter('num_days', required=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import prefect | |
from prefect import Flow, Parameter, task | |
from prefect.tasks.prefect import StartFlowRun | |
logger = logging.getLogger(__name__) | |
@task | |
def calculate_flow_end_date(end_date: str): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from prefect import Task, task, Flow | |
from prefect.utilities.aws import get_boto_client | |
from prefect.utilities.tasks import defaults_from_attrs | |
class ECSRunTask(Task): | |
def __init__( | |
self, | |
task_definition: str, | |
cluster: str = "default", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flow_utilities.db_utils import load_df_to_db | |
import pandas as pd | |
import prefect | |
from prefect import task, Flow | |
from prefect.executors import LocalDaskExecutor | |
from prefect.run_configs import LocalRun | |
from prefect.storage import GitHub | |
FLOW_NAME = "01_extract_load" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Inlude this file in: /Users/your_user_name/.dbt/profiles.yml | |
jaffle_shop: | |
outputs: | |
dev: | |
client_session_keep_alive: false | |
dbname: postgres | |
host: localhost | |
password: your_password_1234 | |
port: 5432 | |
schema: jaffle_shop |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This flow is largely inspired by https://medium.com/slateco-blog/prefect-orchestrating-dbt-10f3ca0baea9 | |
""" | |
import prefect | |
from prefect import task, Flow, Parameter | |
from prefect.client import Secret | |
from prefect.tasks.dbt.dbt import DbtShellTask | |
from prefect.storage import GitHub | |
from prefect.triggers import all_finished | |
from prefect.run_configs import LocalRun |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This flow is largely inspired by https://medium.com/slateco-blog/prefect-orchestrating-dbt-10f3ca0baea9 | |
""" | |
import prefect | |
from prefect import task, Flow, Parameter | |
from prefect.client import Secret | |
from prefect.tasks.dbt.dbt import DbtShellTask | |
from prefect.storage import GitHub | |
from prefect.triggers import all_finished | |
from prefect.run_configs import LocalRun |