Skip to content

Instantly share code, notes, and snippets.

@slopp
Created February 27, 2024 16:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slopp/e690cf0d90ef88a98fbd4449eed0413f to your computer and use it in GitHub Desktop.
Save slopp/e690cf0d90ef88a98fbd4449eed0413f to your computer and use it in GitHub Desktop.
SQL Server to GCS to BQ Dagster Pipeline Example

This example shows a skeleton for how to build a Dagster project that extracts tables from SQL Server, stores the extract as a CSV in GCS, and then uploads the GCS extract to BigQuery.

The actual extract and load logic is omitted. But the purpose of this project is to show how such a pipeline can be represented in Dagster assets.

First, a single pipeline for one table is created. This is demonstrated in the file dagster_mock_one_table.py. To run this example:

  1. Create a Python virtual environment and then run:
pip install dagster dagster-webserver
  1. After copying the contents of dagster_mock_one_table.py to a file with the same name locally, run:
dagster dev -f dagster_mock_one_table.py

The result in Dagster's webserver looks like this:

first_pipeline

The second example, dagster_mock_many_tables.py shows how to build off of the first example to create an asset factory that dynamically generates assets for each of the tables. Follow similar steps as listed above, and then:

dagster dev -f dagster_mock_many_tables.py

The result:

all_tables

The run logs for a run that targets all of these assets:

all_tables_run

With Dagster, you get an operational lineage graph to help you track exactly what data assets (GCS extracts, BQ tables) are being operated on during each run. This example just scratches the surface, Dagster also makes it easy to:

  • run incremental data loads using partitions
  • run pipelines in response to events (eg new data in SQL Server) instead of just on a schedule
  • run individual assets at different cadences or automatically to propagate data changes throughout your platform
  • run data quality checks
  • alert on failures
  • attempt automatic retries
from dagster import asset, Definitions, define_asset_job, ScheduleDefinition, SourceAsset, AssetExecutionContext, AssetSelection, AssetKey
import itertools
# Mock functions that implement the core logic, drop in from your existing codebase
def get_tables():
""" Some function that returns a list of dicts, containing table and schema info"""
return [{"table": "table1", "schema": "schema1"}, {"table": "table2", "schema": "schema2"}]
def sql_to_gcs_bucket(table_name, gcs_bucket):
""" Some function that takes a given sql table and creates a corresponding csv in a GCS bucket """
return gcs_bucket + "/" + table_name
def gcs_bucket_to_bq(table_name, gcs_file, bq_dataset):
""" Some function that loads a table into BQ from a GCS bucket"""
# used in place of the black box GCSToBQOperator
# similar to https://medium.com/@bhaveshpatil0078/loading-csv-file-from-gcs-to-bigquery-using-python-9c646d4e884f
# but gives you more flexibility, eg to swap out for a BQ "external table" implementation
return bq_dataset + "." + table_name
def flatten(list_of_lists):
""" Helper utility to flatten a list of lists"""
return list(itertools.chain.from_iterable(list_of_lists))
########################
# A dynamic dagster pipeline for multiple tables
########################
GCS_BUCKET = "gcs://my_bucket"
BQ_DATASET = "my_dataset"
tables = get_tables()
def table_to_bq_factory(table_name, gcs_bucket, bq_dataset):
""" A factory that creates pipeline for each table"""
assets = []
table_source = SourceAsset(
key=["sql_server", table_name],
description="A representation of the SQL server source data for table: {table_name}"
)
assets.append(table_source)
@asset(
key=["gcs_extract", table_name],
deps=[AssetKey(["sql_server", table_name])],
description=f"The CSV extract of {table_name} in GCS"
)
def table_extract(context: AssetExecutionContext):
result = sql_to_gcs_bucket(
table_name,
GCS_BUCKET
)
context.log.info(f"Extracted result {table_name} to {result}")
context.add_output_metadata({
"table": table_name,
"gcs_location": result,
})
assets.append(table_extract)
@asset(
key=["bq", table_name],
deps=[AssetKey(["gcs_extract", table_name])],
description=f"The CSV extract of {table_name} in GCS"
)
def table_bq(context: AssetExecutionContext):
result = gcs_bucket_to_bq(
table_name,
GCS_BUCKET + "/" + table_name,
BQ_DATASET
)
context.log.info(f"Loaded {table_name} to {result}")
context.add_output_metadata({
"table": table_name,
"bq_table": result,
})
assets.append(table_bq)
return assets
all_assets = []
tables = get_tables()
for table in tables:
assets_for_this_table = table_to_bq_factory(table['table'], GCS_BUCKET, BQ_DATASET)
all_assets.append(assets_for_this_table)
# A job to run everything, more granular jobs could be created instead
run_all_pipelines_daily = ScheduleDefinition(
name = "run_all_pipelines_daily",
cron_schedule="0 8 * * *",
job=define_asset_job(name="run_all_pipelines", selection=AssetSelection.all())
)
defs = Definitions(
assets=[*flatten(all_assets)],
schedules=[run_all_pipelines_daily]
)
from dagster import asset, Definitions, define_asset_job, ScheduleDefinition, SourceAsset, AssetExecutionContext, AssetSelection
# Mock functions that implement the core logic, drop in from your existing codebase
def get_tables():
""" Some function that returns a list of dicts, containing table and schema info"""
return [{"table": "table1", "schema": "schema1"}, {"table": "table1", "schema": "schema1"}]
def sql_to_gcs_bucket(table_name, gcs_bucket):
""" Some function that takes a given sql table and creates a corresponding csv in a GCS bucket """
return gcs_bucket + "/" + table_name
def gcs_bucket_to_bq(table_name, gcs_file, bq_dataset):
""" Some function that loads a table into BQ from a GCS bucket"""
# used in place of the black box GCSToBQOperator
# similar to https://medium.com/@bhaveshpatil0078/loading-csv-file-from-gcs-to-bigquery-using-python-9c646d4e884f
# but gives you more flexibility, eg to swap out for a BQ "external table" implementation
return bq_dataset + "." + table_name
########################
# A dagster pipeline for one table
########################
GCS_BUCKET = "gcs://my_bucket"
BQ_DATASET = "my_dataset"
tables = get_tables()
first_table = tables[0]
# Optional, creating a source asset allows us to represent the underlying SQL Server table in our data lineage graph but does not affect how the pipeline is run
first_table_source = SourceAsset(
key=["sql_server", first_table["table"]],
description="A representation of the SQL server source data"
)
@asset(
deps=[first_table_source]
)
def first_table_extract(context: AssetExecutionContext):
""" The CSV extract of our table in GCS """
table = first_table["table"]
result = sql_to_gcs_bucket(
table,
GCS_BUCKET
)
# Dagster lets you add metadata and logging
context.log.info(f"Extracted result {table} to {result}")
context.add_output_metadata({
"table": table,
"gcs_location": result,
"schema": first_table['schema']
})
@asset(
deps=[first_table_extract]
)
def first_table_bq(context: AssetExecutionContext):
""" The BQ table"""
table = first_table["table"]
result = gcs_bucket_to_bq(
table,
GCS_BUCKET + "/" + table,
BQ_DATASET
)
context.log.info(f"Loaded gcs file {table} to BQ: {result}")
context.add_output_metadata({
"table": table,
"BQ": result,
})
# Create a scheduled job that targets these assets
run_pipeline_daily = ScheduleDefinition(
name = "run_pipeline_daily",
cron_schedule="0 8 * * *",
job=define_asset_job(name="run_pipeline", selection=AssetSelection.all())
)
defs = Definitions(
assets = [first_table_source, first_table_extract, first_table_bq],
schedules= [run_pipeline_daily]
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment