Skip to content

Instantly share code, notes, and snippets.

@jeremyyeo
Last active March 26, 2024 15:34
Show Gist options
  • Save jeremyyeo/b61655a3e5a52eb27640363650c79a1e to your computer and use it in GitHub Desktop.
Save jeremyyeo/b61655a3e5a52eb27640363650c79a1e to your computer and use it in GitHub Desktop.
How to do multi-threaded / parallel operations with dbt #dbt

How to do multi-threaded / parallel operations with dbt

-!  🚨                                          WARNING                                          🚨  !-
Just because you could does not mean you should!

Examples below are using PostgreSQL however the same idea apply across adapters (Snowflake,etc).

In order to run arbitrary SQL commands with dbt - we would typically use hooks or operations. Imagine for a second, we have to run an operation that performs some action multiple times:

insert into foo_1 ...;
insert into foo_2 ...;
insert into foo_3 ...;
insert into foo_4 ...;
insert into foo_5 ...;
insert into foo_6 ...;

If we were to run those SQL statements in a single run-operation - then this would be executed serially. Let's look at an example.

First let's create a function that simulates something doing something "slow":

create or replace function wait_then_return(t integer)
returns integer as $$
begin
  perform pg_sleep(t); -- sleep for t seconds.
  return t;            -- return t.
end;
$$ language plpgsql;

With that function created, when we call that function, the query would take t seconds to execute:

postgres=# select wait_then_return(10) as c;
 c  
----
 10
(1 row)

Time: 10012.288 ms (00:10.012)

postgres=# select wait_then_return(10) as c; select wait_then_return(10) as c; select wait_then_return(10) as c;
 c  
----
 10
(1 row)

Time: 10012.910 ms (00:10.013)
 c  
----
 10
(1 row)

Time: 10012.328 ms (00:10.012)
 c  
----
 10
(1 row)

Time: 10012.681 ms (00:10.013)

In a dbt project, things would look something like:

# dbt_project.yml
name: my_dbt_project
profile: all
config-version: 2
version: "1.0.0"
-- macros/op.sql
{% macro op() %}
    {% do log('Procedure starting: ' ~ modules.datetime.datetime.now(), true) %}
    {% set query %}
        select wait_then_return(10) as c;
        select wait_then_return(10) as c;
        select wait_then_return(10) as c;
        select wait_then_return(10) as c;
        select wait_then_return(10) as c;
        select wait_then_return(10) as c;
    {% endset %}
    {% do run_query(query) %}
    {% do log('Procedure finished: ' ~  modules.datetime.datetime.now(), true) %}
{% endmacro %}
$ dbt run-operation op
01:30:25  Running with dbt=1.6.9
01:30:25  Registered adapter: postgres=1.6.9
01:30:26  Found 0 sources, 0 exposures, 0 metrics, 353 macros, 0 groups, 0 semantic models
01:30:26  Procedure starting: 2023-11-24 14:30:26.350642
01:31:26  Procedure finished: 2023-11-24 14:31:26.425314

We can see it took 1 minute to execute those 6 queries serially. What if we want to run those queries in parallel? dbt has threads - however it is not possible to have multi-threaded run-operations, only the building of models can take advantage of multiple threads.

Since dbt can build multiple models at the same time, lets use that to our advantage.

  1. Create a new materialization:
-- macros/m13n.sql
{% materialization m13n, default %}
    {%- set identifier = model['alias'] -%}
    {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, database=database, type='view') -%}
    {% call statement('main') -%}
        {{ sql }}
    {%- endcall %}
    {{ return({'relations': [target_relation]}) }}
{% endmaterialization %}

This materialization does not do anything except to run the templated SQL statement {{ sql }} which will come from the SQL text in a model sql file.

  1. Create multiple models that all use our custom materialization:
# dbt_project.yml
name: my_dbt_project
profile: all
config-version: 2
version: "1.0.0"

models:
  my_dbt_project:
    +materialized: m13n
    
on-run-start: "{% do log('Procedure starting: ' ~ modules.datetime.datetime.now(), true) %}"
on-run-end: "{% do log('Procedure finished: ' ~  modules.datetime.datetime.now(), true) %}"

I've added some on-run-x hooks so we can see some timestamps being logged.

-- models/foo_1.sql
select wait_then_return(10) as c
-- models/foo_2.sql
select wait_then_return(10) as c
-- models/foo_3.sql
select wait_then_return(10) as c
-- models/foo_4.sql
select wait_then_return(10) as c
-- models/foo_5.sql
select wait_then_return(10) as c
-- models/foo_6.sql
select wait_then_return(10) as c

And now let's run:

$ dbt run --threads 1
01:43:48  Running with dbt=1.6.9
01:43:48  Registered adapter: postgres=1.6.9
01:43:49  Procedure starting: 2023-11-24 14:43:49.342662
01:43:49  Procedure finished: 2023-11-24 14:43:49.350480
01:43:49  Found 6 models, 2 operations, 0 sources, 0 exposures, 0 metrics, 354 macros, 0 groups, 0 semantic models
01:43:49  
01:43:49  
01:43:49  Running 1 on-run-start hook
01:43:49  Procedure starting: 2023-11-24 14:43:49.601148
01:43:49  1 of 1 START hook: my_dbt_project.on-run-start.0 ............................... [RUN]
01:43:49  1 of 1 OK hook: my_dbt_project.on-run-start.0 .................................. [OK in 0.00s]
01:43:49  
01:43:49  Concurrency: 1 threads (target='pg-local')
01:43:49  
01:43:49  1 of 6 START sql m13n model public.foo_1 ....................................... [RUN]
01:43:59  1 of 6 OK created sql m13n model public.foo_1 .................................. [SELECT 1 in 10.06s]
01:43:59  2 of 6 START sql m13n model public.foo_2 ....................................... [RUN]
01:44:09  2 of 6 OK created sql m13n model public.foo_2 .................................. [SELECT 1 in 10.06s]
01:44:09  3 of 6 START sql m13n model public.foo_3 ....................................... [RUN]
01:44:19  3 of 6 OK created sql m13n model public.foo_3 .................................. [SELECT 1 in 10.05s]
01:44:19  4 of 6 START sql m13n model public.foo_4 ....................................... [RUN]
01:44:29  4 of 6 OK created sql m13n model public.foo_4 .................................. [SELECT 1 in 10.06s]
01:44:29  5 of 6 START sql m13n model public.foo_5 ....................................... [RUN]
01:44:39  5 of 6 OK created sql m13n model public.foo_5 .................................. [SELECT 1 in 10.05s]
01:44:39  6 of 6 START sql m13n model public.foo_6 ....................................... [RUN]
01:44:49  6 of 6 OK created sql m13n model public.foo_6 .................................. [SELECT 1 in 10.05s]
01:44:50  
01:44:50  Running 1 on-run-end hook
01:44:50  Procedure finished: 2023-11-24 14:44:50.018001
01:44:50  1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
01:44:50  1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
01:44:50  
01:44:50  
01:44:50  Finished running 6 m13n models, 2 hooks in 0 hours 1 minutes and 0.60 seconds (60.60s).
01:44:50  
01:44:50  Completed successfully
01:44:50  
01:44:50  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

$ dbt run --threads 6
01:45:13  Running with dbt=1.6.9
01:45:13  Registered adapter: postgres=1.6.9
01:45:14  Procedure starting: 2023-11-24 14:45:14.079126
01:45:14  Procedure finished: 2023-11-24 14:45:14.087990
01:45:14  Found 6 models, 2 operations, 0 sources, 0 exposures, 0 metrics, 354 macros, 0 groups, 0 semantic models
01:45:14  
01:45:14  
01:45:14  Running 1 on-run-start hook
01:45:14  Procedure starting: 2023-11-24 14:45:14.325350
01:45:14  1 of 1 START hook: my_dbt_project.on-run-start.0 ............................... [RUN]
01:45:14  1 of 1 OK hook: my_dbt_project.on-run-start.0 .................................. [OK in 0.00s]
01:45:14  
01:45:14  Concurrency: 6 threads (target='pg-local')
01:45:14  
01:45:14  1 of 6 START sql m13n model public.foo_1 ....................................... [RUN]
01:45:14  2 of 6 START sql m13n model public.foo_2 ....................................... [RUN]
01:45:14  3 of 6 START sql m13n model public.foo_3 ....................................... [RUN]
01:45:14  4 of 6 START sql m13n model public.foo_4 ....................................... [RUN]
01:45:14  5 of 6 START sql m13n model public.foo_5 ....................................... [RUN]
01:45:14  6 of 6 START sql m13n model public.foo_6 ....................................... [RUN]
01:45:24  2 of 6 OK created sql m13n model public.foo_2 .................................. [SELECT 1 in 10.33s]
01:45:24  1 of 6 OK created sql m13n model public.foo_1 .................................. [SELECT 1 in 10.34s]
01:45:24  4 of 6 OK created sql m13n model public.foo_4 .................................. [SELECT 1 in 10.33s]
01:45:24  6 of 6 OK created sql m13n model public.foo_6 .................................. [SELECT 1 in 10.32s]
01:45:24  3 of 6 OK created sql m13n model public.foo_3 .................................. [SELECT 1 in 10.34s]
01:45:24  5 of 6 OK created sql m13n model public.foo_5 .................................. [SELECT 1 in 10.33s]
01:45:24  
01:45:24  Running 1 on-run-end hook
01:45:24  Procedure finished: 2023-11-24 14:45:24.739074
01:45:24  1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
01:45:24  1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
01:45:24  
01:45:24  
01:45:24  Finished running 6 m13n models, 2 hooks in 0 hours 0 minutes and 10.58 seconds (10.58s).
01:45:24  
01:45:24  Completed successfully
01:45:24  
01:45:24  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

$ dbt run --threads 3
01:45:38  Running with dbt=1.6.9
01:45:38  Registered adapter: postgres=1.6.9
01:45:39  Procedure starting: 2023-11-24 14:45:39.289995
01:45:39  Procedure finished: 2023-11-24 14:45:39.297111
01:45:39  Found 6 models, 2 operations, 0 sources, 0 exposures, 0 metrics, 354 macros, 0 groups, 0 semantic models
01:45:39  
01:45:39  
01:45:39  Running 1 on-run-start hook
01:45:39  Procedure starting: 2023-11-24 14:45:39.492161
01:45:39  1 of 1 START hook: my_dbt_project.on-run-start.0 ............................... [RUN]
01:45:39  1 of 1 OK hook: my_dbt_project.on-run-start.0 .................................. [OK in 0.00s]
01:45:39  
01:45:39  Concurrency: 3 threads (target='pg-local')
01:45:39  
01:45:39  1 of 6 START sql m13n model public.foo_1 ....................................... [RUN]
01:45:39  2 of 6 START sql m13n model public.foo_2 ....................................... [RUN]
01:45:39  3 of 6 START sql m13n model public.foo_3 ....................................... [RUN]
01:45:49  1 of 6 OK created sql m13n model public.foo_1 .................................. [SELECT 1 in 10.09s]
01:45:49  3 of 6 OK created sql m13n model public.foo_3 .................................. [SELECT 1 in 10.09s]
01:45:49  2 of 6 OK created sql m13n model public.foo_2 .................................. [SELECT 1 in 10.09s]
01:45:49  4 of 6 START sql m13n model public.foo_4 ....................................... [RUN]
01:45:49  5 of 6 START sql m13n model public.foo_5 ....................................... [RUN]
01:45:49  6 of 6 START sql m13n model public.foo_6 ....................................... [RUN]
01:45:59  4 of 6 OK created sql m13n model public.foo_4 .................................. [SELECT 1 in 10.10s]
01:45:59  5 of 6 OK created sql m13n model public.foo_5 .................................. [SELECT 1 in 10.10s]
01:45:59  6 of 6 OK created sql m13n model public.foo_6 .................................. [SELECT 1 in 10.10s]
01:45:59  
01:45:59  Running 1 on-run-end hook
01:45:59  Procedure finished: 2023-11-24 14:45:59.751741
01:45:59  1 of 1 START hook: my_dbt_project.on-run-end.0 ................................. [RUN]
01:45:59  1 of 1 OK hook: my_dbt_project.on-run-end.0 .................................... [OK in 0.00s]
01:45:59  
01:45:59  
01:45:59  Finished running 6 m13n models, 2 hooks in 0 hours 0 minutes and 20.39 seconds (20.39s).
01:45:59  
01:45:59  Completed successfully
01:45:59  
01:45:59  Done. PASS=6 WARN=0 ERROR=0 SKIP=0 TOTAL=6

By controlling the number of threads - we control number of "models" dbt executes at any one time and when we set that to be equal to the number of models (6), the whole run took at most the lenght of time a single execution would have taken (~ 10 seconds) vs (~ 1 minute).

01:45:24  Finished running 6 m13n models, 2 hooks in 0 hours 0 minutes and 10.58 seconds (10.58s).

We can of course also achieve the same outcome by using normal table/view materializations and using pre/post-hooks but I didn't want to also have to then drop those models after they have been created since I want to keep my database clear of random tables/views.

One caveat with using this pattern is then you have random nodes showing up in your dbt DAG (even if we choose to hide those nodes via show: false):

image

@juan-yunis
Copy link

@ezequielscott I think the idea is to only run the SQL without creating a table for it, the new materialization is created to just run the SQL without creating any new table with the filename as name.

@SoumayaMauthoorMOJ
Copy link

SoumayaMauthoorMOJ commented Mar 8, 2024

How about using the view materialization to give the filenames a unique name, and then doing the insert as a post-hook? You can also delete the view as a post-hook if you don't like having lots of unnecessary views. All the view models will be able to run and insert data concurrently.

@pcc-mtiner
Copy link

I just ran into similar problem myself this week, here is how my approach differs:

  • Rather than create custom materialization, I created a macro with the logic for each query I wanted to parallelize (each different query gets it's own macro), which then accepted a 'partition' parameter to use the same macro for the various splits of one query.
  • This macro would then be called as either a pre/post hook on however many splits you want to do. As Soumaya mentioned above me, these can just be called using models that have 'view' materialization (along with some dummy model definition like 'SELECT 1'), followed by dropping these views in a hook after they finish. I think I could probably adapt your custom materialization here to avoid the need to create/drop the view.
  • I also needed one model that is a sort of "pre-model". It's only purpose is to create the table if it does not exist, and truncate (or other cleanup operations) if necessary, as I'm not sure these processes could be cleanly left to the parallel threads. If anything I figured it would just be more clear to have a first model step that makes sure the required destination tables are in place, before kicking off the various parallel view models. The 'depends_on' tag can be used to ensure the pre-model step runs before the parallel steps.

All in all, my final conclusion is that while it works, it's a bit more convoluted than I would like, and I'm thinking it could be a massive value add for DBT if they formalized this directly. I feel like it should be possible to enable some type of configuration that allows us to define a single model file, along with some config that defines how many parts to run it in and what those parts are. Potentially an additional config to limit maximum number of sequential parts (for example, you might have 16 parts but only want to run 2 at a time because they are expensive.). It seems there are many occasions when joins can be localized to key relationships which can cleanly be broken into chunks, which allows much more flexibility in memory usage (as well as helping maximize concurrent use of compute resources). This would be so amazing for my team if we could basically toggle it on for any model with just setting a few configs.

@SoumayaMauthoorMOJ
Copy link

@pcc-mtiner You've mentioned a tag, I assume you mean -- depends_on: {{ ref('<model>') }}? Do you have any thoughts about how to pass this dynamically in the case where you have a large number of inserts? I've tried all the suggestions here but no luck.

@SoumayaMauthoorMOJ
Copy link

SoumayaMauthoorMOJ commented Mar 17, 2024

Also it seems you can't delete the view as a post-hook because dbt expects the entity to be available:

Unhandled error while executing target/run/dbt_athena_iceberg_datalake/models/<model>.sql`
An error occurred (EntityNotFoundException) when calling the GetTableVersions operation: Table <model> not found.

@pcc-mtiner
Copy link

@pcc-mtiner You've mentioned a tag, I assume you mean -- depends_on: {{ ref('<model>') }}? Do you have any thoughts about how to pass this dynamically in the case where you have a large number of inserts? I've tried all the suggestions here but no luck.

That's correct, I was talking about that feature. So far I haven't found way to have it be dynamic, so if I want to expand/reduce number of parallel splits it can be a little tedious.

Also it seems you can't delete the view as a post-hook because dbt expects the entity to be available:

Unhandled error while executing target/run/dbt_athena_iceberg_datalake/models/<model>.sql`
An error occurred (EntityNotFoundException) when calling the GetTableVersions operation: Table <model> not found.

I haven't run into this yet myself, running several views in parallel and then dropping the views worked for my case. I suspect it might have something to do with trying to use the model named by the dropped view in some downstream dependency. I think I have an idea that might accomodate. The main process would consist of three main steps:

  1. Start with a 'run-initialization' view model that creates your destination table (or does any cleanup tasks), then drops view in post-hook. Destination table will be created by manual calling DDL using dbt run_query macro. This table will should have different name than the run-intialization model itself.
  2. Run several parallel view model that all call macro to insert into the table created in step 1, with posthooks to drop view after. Each model should de dependent on the initialization model, to ensure table is ready to receive records.
  3. Have a 'run-finalization' model. This model should have the same name as the destination table created by step 1, that way you can reference the destination table in downstream models. Since the table is already created in Step 1, you will want to set some type of materialization that won't try to recreate the table in the last step. Can either create custom materialization, or perhaps use 'incremental' type since this wont try to alter table if it already exists. Could then just have some simple script that won't actually insert any further records. -- depends_on: tags could also be consolidated in this one model if you want to add/remove parallel splits.

Potentially may need to do all the DROP VIEW posthooks in the run-finalization model, rather than in their individual models. Hopefully that helps get through your issue!

@SoumayaMauthoorMOJ
Copy link

SoumayaMauthoorMOJ commented Mar 18, 2024

@pcc-mtiner thanks for confirming, I won't spend more time making depend_on dynamic. As for the deleted view it's good to know it's possible, I'll check to see where my code might be calling it.

@SoumayaMauthoorMOJ
Copy link

SoumayaMauthoorMOJ commented Mar 18, 2024

@pcc-mtiner what version of dbt are you using? I downgraded from dbt-core 1.7.8 to 1.6.5 and the EntityNotFoundException disappeared. I'm using dbt-athena as my adapter and downgraded from 1.7.2 to 1.6.2. I'm wondering whether to raise this with dbt-athena or dbt-core so would be helpful to know if you also get the error with dbt-core 1.7.8

@pcc-mtiner
Copy link

@SoumayaMauthoorMOJ apologies I missed to reply on this, here are all dbt-related packages in my environment:
dbt-clickhouse 1.7.1
dbt-core 1.7.10
dbt-extractor 0.5.1
dbt-metricflow 0.4.0
dbt-semantic-interfaces 0.4.4

@SoumayaMauthoorMOJ
Copy link

Thanks must be a problem with dbt-athena in that case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment