Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Script for backfilling DBT models which use partitions
#!/usr/bin/env python
import json
import copy
import sys
import argparse
from datetime import datetime, timedelta
from subprocess import Popen, PIPE
# dictionary of partiton names to variable generating functions
def week_partitions(x):
x = x - timedelta(days=x.isoweekday() % 7)
return { 'year': x.strftime('%Y'), 'month': x.strftime('%m'), 'day': x.strftime('%d'), 'week': x.strftime('%W'), 'part': x.strftime('%Y_%W') }
PARTITIONS = {
'day': lambda x: { 'year': x.strftime('%Y'), 'month': x.strftime('%m'), 'day': x.strftime('%d'), 'part': x.strftime('%Y_%m_%d') },
'week': lambda x: week_partitions(x),
'month': lambda x: { 'year': x.strftime('%Y'), 'month': x.strftime('%m'), 'part': x.strftime('%Y_%m') },
'year': lambda x: { 'year': x.strftime('%Y'), 'part': x.strftime('%Y') }
}
class DbtBackfill(object):
def __init__(self,
model,
target,
from_date,
to_date = datetime.utcnow(),
partition = 'month',
var_prefix="",
profiles_dir="~/.dbt/",
static_vars={}):
"""
Initialize parameters for backfill job.
"""
self.model = model
self.target = target
self.from_date = from_date
self.to_date = to_date
# check for valid parititons
if partition not in PARTITIONS:
raise ValueError("'partition' must by one of: {0}".format(", ".join(PARTITIONS.keys())))
self.partition = partition
self.var_prefix = var_prefix
self.profiles_dir = profiles_dir
self.static_vars = {self._gen_var_key(k):v for k,v in static_vars.items()}
def _gen_var_key(self, k):
"""
Add a prefix to a key.
"""
if self.var_prefix == "": return k
return "{}_{}".format(self.var_prefix, k)
def _gen_vars(self, partition):
"""
format vars as JSON
"""
d = copy.copy(self.static_vars)
d.update(partition)
return json.dumps(d)
def _gen_command(self, partition):
"""
Generate a dbt command
"""
return [
"dbt", "run",
"--profiles-dir", self.profiles_dir,
"--models", self.model,
"--target", self.target,
"--vars", "{}".format(self._gen_vars(partition))
]
def _run_command(self, cmd):
"""
Execute a dbt command and catch errors.
"""
sys.stderr.write("{0} | Backfill command: {1}\n"\
.format(datetime.now().strftime('%H:%M:%S'), " ".join(cmd)))
process = Popen(cmd, stdout=PIPE, stderr=PIPE)
stdout, stderr = process.communicate()
sys.stderr.write(stdout.decode('utf-8'))
if process.returncode != 0:
sys.exit(process.returncode)
@property
def partitions(self):
"""
A list of partitions to build.
"""
delta = self.to_date - self.from_date
all_parts = [
PARTITIONS[self.partition](self.from_date + timedelta(days=i))
for i in range(delta.days+1)
]
seen = set()
parts = []
for p in all_parts:
if p['part'] not in seen:
seen.add(p['part'])
parts.append({self._gen_var_key(k):v for k,v in p.items()})
return parts
@property
def commands(self):
"""
A list of commands to run for each partition
"""
return list(map(self._gen_command, self.partitions))
def run(self):
"""
Run all partitions
"""
return list(map(self._run_command, self.commands))
# function for parsing cli-input date
cli_format_date = lambda x: datetime.strptime(x, '%Y-%m-%d').date()
def cli():
"""
Command line interface.
"""
parser = argparse.ArgumentParser(prog="backfill-model")
parser.add_argument("--model", default=None, required=True,
help="The model to backfill.")
parser.add_argument("--target", default='dev',
help="The target to run dbt with (default: dev).")
parser.add_argument("--from-date", default=None, required=True,
type=cli_format_date,
help="The date at which to start the backfill.")
parser.add_argument("--to-date", default=None, required=True,
type=cli_format_date,
help="The date at which to end the backfill.")
parser.add_argument("--partition", default=None, required=True,
help="The unit to parition backfill jobs by (day/week/month/year).")
parser.add_argument("--var-prefix", default="",
help="A prefix to add to each variable passed to dbt (day => prefix_day)")
parser.add_argument("--profiles-dir", default="~/.dbt/",
help="The directory that your dbt profile is stored in.")
parser.add_argument("--static-vars", default="{}", type=json.loads,
help="static variables (formatted as JSON) to pass to dbt's --vars option.\
--var-prefix will apply to these variables as well")
args = parser.parse_args()
backfill = DbtBackfill(
model = args.model,
target = args.target,
from_date = args.from_date,
to_date = args.to_date,
partition = args.partition,
var_prefix = args.var_prefix,
static_vars = args.static_vars,
profiles_dir = args.profiles_dir
)
backfill.run()
if __name__ == '__main__':
cli()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.