Skip to content

Instantly share code, notes, and snippets.

@mistercrunch
Created July 31, 2015 15:22
Show Gist options
  • Save mistercrunch/3c4b59918fb0c8664f30 to your computer and use it in GitHub Desktop.
Save mistercrunch/3c4b59918fb0c8664f30 to your computer and use it in GitHub Desktop.
def stage_check_exchange(
task_id_prefix,
hql_stage,
sql_check,
exchange_statement,
default_args):
'''
Used to abstract the common pattern of:
* loading into a staging
* performing data quality checks
* exchange (publish) the partition into its final destination
Returns a dictionary of 3 chained tasks with expected task_ids.
'''
params = params or {}
subdag = DAG(
dag_id=dag.dag_id + '.' + task_id_prefix,
params=dag.params,
default_args=dag.default_args,
template_searchpath=dag.template_searchpath,
user_defined_macros=dag.user_defined_macros)
stage = HiveOperator(
task_id=task_id_prefix + '_stage',
hql=template,
dag=subdag,
params=dict(list(params.iteritems()) +
[('stage', True), ('check', False), ('exchange', False)]),
)
stage.ui_color = '#FCAFB2'
check = PrestoCheckOperator(
task_id=task_id_prefix + '_check',
sql=template,
dag=subdag,
params=dict(list(params.iteritems()) +
[('stage', False), ('check', True), ('exchange', False)]),
)
check.set_upstream(stage)
check.ui_color = '#87FF97'
exchange = HiveOperator(
task_id=task_id_prefix + '_exchange',
hql=template,
dag=subdag,
params=dict(list(params.iteritems()) +
[('stage', False), ('check', False), ('exchange', True)]),
)
exchange.set_upstream(check)
exchange.ui_color = '#6CC4CD'
return subdag
#-------------------------------------------------------------
subdag = stage_check_exchange_subdag(
table,
template=template,
params=table_params,
dag=dag)
SubDagOperator(
task_id=table,
subdag=subdag,
executor=SequentialExecutor(),
dag=dag)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment