Skip to content

Instantly share code, notes, and snippets.

View anna-geller's full-sized avatar

Anna Geller anna-geller

View GitHub Profile
import awswrangler as wr
from datetime import date
import logging
import pandas as pd
from pandas_datareader.data import DataReader
class StockDataETLRunner:
def __init__(
self,
# search replace AWS_ACCOUNT_ID with your AWS account ID and adjust the variables below (line 3-7), especially your API key
# if your flow needs access to other AWS resources other than S3, add those in the task role policy: line 96-108
export AWS_REGION=us-east-1
export ECS_CLUSTER_NAME=prefectEcsCluster
export ECS_LOG_GROUP_NAME=/ecs/prefectEcsAgent
export ECS_SERVICE_NAME=prefectECSAgent
export PREFECT_API_KEY=your_Prefect_Cloud_API_key
export AWS_PAGER=""
aws ssm put-parameter --type SecureString --name PREFECT__CLOUD__API_KEY --value $PREFECT_API_KEY --region $AWS_REGION --overwrite
import prefect
from prefect.storage import S3
from prefect.run_configs import ECSRun
from prefect import task, Flow
FLOW_NAME = "ecs_demo"
STORAGE = S3(
bucket="prefect-datasets",
key=f"flows/{FLOW_NAME}.py",
stored_as_script=True,
from prefect import Flow, Parameter, task
@task(log_stdout=True)
def test_parameter_value(user_input: str):
print(f"Got parameter value: {user_input}")
with Flow('child') as flow:
num_days_parameter = Parameter('num_days', required=True)
import logging
import prefect
from prefect import Flow, Parameter, task
from prefect.tasks.prefect import StartFlowRun
logger = logging.getLogger(__name__)
@task
def calculate_flow_end_date(end_date: str):
from prefect import Task, task, Flow
from prefect.utilities.aws import get_boto_client
from prefect.utilities.tasks import defaults_from_attrs
class ECSRunTask(Task):
def __init__(
self,
task_definition: str,
cluster: str = "default",
from flow_utilities.db_utils import load_df_to_db
import pandas as pd
import prefect
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
from prefect.run_configs import LocalRun
from prefect.storage import GitHub
FLOW_NAME = "01_extract_load"
# Inlude this file in: /Users/your_user_name/.dbt/profiles.yml
jaffle_shop:
outputs:
dev:
client_session_keep_alive: false
dbname: postgres
host: localhost
password: your_password_1234
port: 5432
schema: jaffle_shop
"""
This flow is largely inspired by https://medium.com/slateco-blog/prefect-orchestrating-dbt-10f3ca0baea9
"""
import prefect
from prefect import task, Flow, Parameter
from prefect.client import Secret
from prefect.tasks.dbt.dbt import DbtShellTask
from prefect.storage import GitHub
from prefect.triggers import all_finished
from prefect.run_configs import LocalRun
"""
This flow is largely inspired by https://medium.com/slateco-blog/prefect-orchestrating-dbt-10f3ca0baea9
"""
import prefect
from prefect import task, Flow, Parameter
from prefect.client import Secret
from prefect.tasks.dbt.dbt import DbtShellTask
from prefect.storage import GitHub
from prefect.triggers import all_finished
from prefect.run_configs import LocalRun