Created
April 7, 2023 15:56
-
-
Save telderfts/9a52686c5743a24b9169efb87ff83308 to your computer and use it in GitHub Desktop.
Tableau op
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import List | |
from dagster import job, op, DynamicOut, DynamicOutput | |
from dagster_aws.secretsmanager import secretsmanager_secrets_resource | |
from dagster_k8s import execute_k8s_job | |
@op(required_resource_keys={'secrets'}) | |
def secretsmanager_secrets_aws_key(context): | |
return os.getenv ("aws_access_key_id") | |
@op(required_resource_keys={'secrets'}) | |
def secretsmanager_secrets_aws_secret(context): | |
return os.getenv ("aws_secret_access_key") | |
@op(required_resource_keys={'secrets'}) | |
def secretsmanager_secrets_tableau_user(context): | |
return os.getenv("username") | |
@op(required_resource_keys={'secrets'}) | |
def secretsmanager_secrets_tableau_pass(context): | |
return os.getenv("password") | |
@job(resource_defs={'secrets': secretsmanager_secrets_resource}) | |
def secrets_job(): | |
secretsmanager_secrets_aws_key() | |
secretsmanager_secrets_aws_secret() | |
secretsmanager_secrets_tableau_user() | |
secretsmanager_secrets_tableau_pass() | |
secrets_job.execute_in_process( | |
run_config={ | |
'resources': { | |
'secrets': { | |
'config': { | |
'region_name': 'us-west-2', | |
'secrets_tag': 'dagster-creds', | |
'add_to_environment': True, | |
} | |
} | |
} | |
} | |
) | |
@op(out=DynamicOut()) | |
def get_orgs() -> List[DynamicOutput[str]]: | |
org_list = [ | |
DynamicOutput("BRISTOL", mapping_key="BRISTOL"), | |
DynamicOutput("KU", mapping_key="KU"), | |
DynamicOutput("USLTUL", mapping_key="USLTUL"), | |
] | |
return org_list | |
@op | |
def extract_org(org): | |
print(f"Extract: {org}") | |
return org | |
@op | |
def attribute_org(org): | |
print(f"Attribute: {org}") | |
@op | |
def validate_org(org): | |
print(f"Validate: {org}") | |
@op | |
def extract_tableau(orgs, context): | |
execute_k8s_job( | |
image=f"xxxxxxxxxxx.dkr.ecr.us-west-2.amazonaws.com/airflow-jobs:tableau_extractor-v0.2.0", | |
env_vars=[ | |
f"AWS_ACCESS_KEY_ID={secretsmanager_secrets_aws_key}", | |
f"AWS_SECRET_ACCESS_KEY={secretsmanager_secrets_aws_secret}", | |
"EXTRACT_PATH=/opt/extract.hyper", | |
"PARQUET_PATH=xxxxxxxxxxxxxxxxxxxx/donor_bom.parquet", | |
"DATASOURCE_NAME=dagster_test", | |
"ORG_EXCLUDE=", | |
f"TABLEAU_USER={secretsmanager_secrets_tableau_user}", | |
f"TABLEAU_PASS={secretsmanager_secrets_tableau_pass}", | |
f"TABLEAU_PROJECT=QA", | |
], | |
resources={ | |
"limit_memory": "12Gi", | |
"request_memory": "12Gi", | |
"limit_cpu": 2, | |
"request_cpu": 2, | |
}, | |
labels={ | |
"xxxxxxx/role": "datasci", | |
"xxxxxxx/app": "airflow", | |
"xxxxxxx/purpose": "", | |
"xxxxxxx/environment": "produsa", | |
}, | |
timeout=600, | |
) | |
print(f"Tableau extract: {orgs}") | |
@job | |
def dynamic_graph(): | |
orgs = get_orgs() | |
extract_results = orgs.map(extract_org) | |
extract_results.map(attribute_org) | |
extract_results.map(validate_org) | |
extract_tableau(extract_results.collect()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment