Skip to content

Instantly share code, notes, and snippets.

@telderfts
Created April 7, 2023 15:56
Show Gist options
  • Save telderfts/9a52686c5743a24b9169efb87ff83308 to your computer and use it in GitHub Desktop.
Save telderfts/9a52686c5743a24b9169efb87ff83308 to your computer and use it in GitHub Desktop.
Tableau op
import os
from typing import List
from dagster import job, op, DynamicOut, DynamicOutput
from dagster_aws.secretsmanager import secretsmanager_secrets_resource
from dagster_k8s import execute_k8s_job
@op(required_resource_keys={'secrets'})
def secretsmanager_secrets_aws_key(context):
return os.getenv ("aws_access_key_id")
@op(required_resource_keys={'secrets'})
def secretsmanager_secrets_aws_secret(context):
return os.getenv ("aws_secret_access_key")
@op(required_resource_keys={'secrets'})
def secretsmanager_secrets_tableau_user(context):
return os.getenv("username")
@op(required_resource_keys={'secrets'})
def secretsmanager_secrets_tableau_pass(context):
return os.getenv("password")
@job(resource_defs={'secrets': secretsmanager_secrets_resource})
def secrets_job():
secretsmanager_secrets_aws_key()
secretsmanager_secrets_aws_secret()
secretsmanager_secrets_tableau_user()
secretsmanager_secrets_tableau_pass()
secrets_job.execute_in_process(
run_config={
'resources': {
'secrets': {
'config': {
'region_name': 'us-west-2',
'secrets_tag': 'dagster-creds',
'add_to_environment': True,
}
}
}
}
)
@op(out=DynamicOut())
def get_orgs() -> List[DynamicOutput[str]]:
org_list = [
DynamicOutput("BRISTOL", mapping_key="BRISTOL"),
DynamicOutput("KU", mapping_key="KU"),
DynamicOutput("USLTUL", mapping_key="USLTUL"),
]
return org_list
@op
def extract_org(org):
print(f"Extract: {org}")
return org
@op
def attribute_org(org):
print(f"Attribute: {org}")
@op
def validate_org(org):
print(f"Validate: {org}")
@op
def extract_tableau(orgs, context):
execute_k8s_job(
image=f"xxxxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0",
env_vars=[
f"AWS_ACCESS_KEY_ID={secretsmanager_secrets_aws_key}",
f"AWS_SECRET_ACCESS_KEY={secretsmanager_secrets_aws_secret}",
"EXTRACT_PATH=/opt/extract.hyper",
"PARQUET_PATH=xxxxxxxxxxxxxxxxxxxx/donor_bom.parquet",
"DATASOURCE_NAME=dagster_test",
"ORG_EXCLUDE=",
f"TABLEAU_USER={secretsmanager_secrets_tableau_user}",
f"TABLEAU_PASS={secretsmanager_secrets_tableau_pass}",
f"TABLEAU_PROJECT=QA",
],
resources={
"limit_memory": "12Gi",
"request_memory": "12Gi",
"limit_cpu": 2,
"request_cpu": 2,
},
labels={
"xxxxxxx/role": "datasci",
"xxxxxxx/app": "airflow",
"xxxxxxx/purpose": "",
"xxxxxxx/environment": "produsa",
},
timeout=600,
)
print(f"Tableau extract: {orgs}")
@job
def dynamic_graph():
orgs = get_orgs()
extract_results = orgs.map(extract_org)
extract_results.map(attribute_org)
extract_results.map(validate_org)
extract_tableau(extract_results.collect())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment