Skip to content

Instantly share code, notes, and snippets.

@klesouza
Last active March 31, 2023 19:15
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save klesouza/61267b9a38effe0f5baea894393c98e6 to your computer and use it in GitHub Desktop.
Save klesouza/61267b9a38effe0f5baea894393c98e6 to your computer and use it in GitHub Desktop.
Example Dagster pipeline running on Azure Databricks
from dagster import pipeline, solid, repository, execute_pipeline
from dagster.core.definitions.mode import ModeDefinition
from dagster_databricks import databricks_pyspark_step_launcher
from pathlib import Path
from dagster_pyspark import pyspark_resource
from dagster_azure.adls2.io_manager import adls2_pickle_io_manager
from dagster_azure.adls2 import adls2_resource
from dagster import pipeline, solid, repository, execute_pipeline
from dagster.core.definitions.mode import ModeDefinition
from dagster_databricks import databricks_pyspark_step_launcher
from pathlib import Path
from dagster_pyspark import pyspark_resource
from dagster_azure.adls2.io_manager import adls2_pickle_io_manager
from dagster_azure.adls2 import adls2_resource
databricks_mode = ModeDefinition(
name="databricks",
resource_defs={
"pyspark_step_launcher": databricks_pyspark_step_launcher.configured(
{
"run_config": {
"run_name": "test dagster",
"cluster": {"existing": "[cluster-id]"}, # from the Databricks URL
"libraries": [
{"pypi": {"package": "dagster-azure==0.12.1"}},
{"pypi": {"package": "dagster-databricks==0.12.1"}},
],
},
"databricks_host": "https://[cluster url].azuredatabricks.net/",
"databricks_token": {
"env": "DB_TOKEN" # It has to be available in the Databricks env as well
},
"local_pipeline_package_path": str(Path(__file__).parent.parent),
"secrets_to_env_variables": [
{ # ENV VAR created in the DAtabricks run time with values from the secret
"name": "DUMMY_KEY", # it has to match the the env keys you used in this file
"key": "[same as storage_account_key_key]",
"scope": "[same as secret_scope]",
},
{ # ENV VAR created in the DAtabricks run time with values from the secret
"name": "DB_TOKEN", # it has to match the the env keys you used in this file
"key": "[key for the DATABRICKS_TOKEN in KeyVault]",
"scope": "[secret_scope for the key]",
},
],
"storage": {
"adls2": {
"storage_account_name": "[storage account name]",
"storage_account_key_key": "[KeyVault keyname where Primary/Secondary Azure storage key is stored]", # you can find with databricks CLI `databricks secrets list --scope [scope name]`
"secret_scope": "[secret scope]",
}
},
}
),
"pyspark": pyspark_resource.configured(
{
"spark_conf": {
"spark.executor.memory": "2g",
"fs.azure.account.key.[storage account name].dfs.core.windows.net": {
"env": "DUMMY_KEY"
}, # Primary/Secondary Azure storage key
}
}
),
"adls2": adls2_resource.configured(
{
"credential": {"key": {"env": "DUMMY_KEY"}}, # Primary/Secondary Azure storage key
"storage_account": "[storage account name]",
}
),
"io_manager": adls2_pickle_io_manager.configured(
{"adls2_file_system": "[Blob container name]", "adls2_prefix": "[Blob path]"}
),
},
)
@solid(required_resource_keys={"pyspark_step_launcher", "pyspark"})
def test(context):
context.resources.pyspark.spark_session.createDataFrame([(1, 2), (2, 1)], 'a: int, b: int')
@pipeline(mode_defs=[databricks_mode])
def my_pipeline():
test()
@repository
def databricks_example():
return [my_pipeline]
if __name__ == "__main__":
execute_pipeline(my_pipeline)
databricks_mode = ModeDefinition(
name="databricks",
resource_defs={
"pyspark_step_launcher": databricks_pyspark_step_launcher.configured(
{
"run_config": {
"run_name": "test dagster",
"cluster": {
"existing": "[cluster-id]" # from the Databricks URL
},
"libraries": [
{ "pypi": { "package": "dagster-azure==0.12.1" } },
{ "pypi": { "package": "dagster-databricks==0.12.1" } }
]
},
"databricks_host": "https://[cluster url].azuredatabricks.net/",
"databricks_token": {
"env": "DB_TOKEN" # It has to be available in the Databricks env as well
},
"local_pipeline_package_path": str(Path(__file__).parent.parent),
"secrets_to_env_variables": [{ # ENV VAR created in the DAtabricks run time with values from the secret
"name": "DUMMY_KEY", # it has to match the the env keys you used in this file
"key": "[same as storage_account_key_key]",
"scope": "[same as secret_scope]"
},{ # ENV VAR created in the DAtabricks run time with values from the secret
"name": "DB_TOKEN", # it has to match the the env keys you used in this file
"key": "[key for the DATABRICKS_TOKEN in KeyVault]",
"scope": "[secret_scope for the key]"
}],
"storage": {
"adls2": {
"storage_account_name": "[storage account name]",
"storage_account_key_key": "[KeyVault keyname where Primary/Secondary Azure storage key is stored]", # you can find with databricks CLI `databricks secrets list --scope [scope name]`
"secret_scope": "[secret scope]"
}
}
}
),
"pyspark": pyspark_resource.configured(
{"spark_conf": {
"spark.executor.memory": "2g",
"fs.azure.account.key.[storage account name].dfs.core.windows.net": {"env": "DUMMY_KEY"}, # Primary/Secondary Azure storage key
}}),
"adls2": adls2_resource.configured(
{
"credential": {
"key": { "env": "DUMMY_KEY"} # Primary/Secondary Azure storage key
},
"storage_account": "[storage account name]"
}
),
"io_manager": adls2_pickle_io_manager.configured(
{
"adls2_file_system": "[Blob container name]",
"adls2_prefix": "[Blob path]"
}
)
}
)
@solid(required_resource_keys={"pyspark_step_launcher", "pyspark"})
def test(context):
context.resources.pyspark.spark_session.createDataFrame([
(1, 2),
(2, 1)
], 'a: int, b: int')
@pipeline(mode_defs=[databricks_mode])
def my_pipeline():
test()
@repository
def databricks_example():
return [my_pipeline]
if __name__ == "__main__":
execute_pipeline(my_pipeline)
# Run using: env DAGSTER_HOME=$PWD DUMMY_KEY=something DB_TOKEN=something dagster pipeline execute -f dagster_databricks.py
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment