Created
December 4, 2015 19:05
-
-
Save mtustin-handy/675622063378ec73f861 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def make_spooq_exporter(table, schema, task_id, dag): | |
return SpooqExportOperator( | |
jdbc_url=('jdbc:mysql://%s/%s?user=user&password=pasta' | |
% (TARGET_DB_HOST,TARGET_DB_NAME)), | |
target_table=table, | |
hive_table='%s.%s' % (schema, table), | |
dag=dag, | |
on_retry_callback=truncate_db, | |
task_id=task_id) | |
def truncate_db(context): | |
hook = MySqlHook('clean_db_export') | |
hook.run( | |
'truncate `%s`'%context['task_instance'].task.target_table, | |
autocommit=False, | |
parameters=None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Marcin,
I liked your Ariflow write up on Medium, I have referenced it regularly while writing my first Airflow DAG.
I was curious about the context in truncate_db():
is it the task instance returned in make_spooq_exporter (visible as Attributes in Task Details in the web UI)?
I'm trying to reference an op_kwarg in my retry function.
Thanks!