Skip to content

Instantly share code, notes, and snippets.

@mtustin-handy
Created December 4, 2015 19:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mtustin-handy/675622063378ec73f861 to your computer and use it in GitHub Desktop.
Save mtustin-handy/675622063378ec73f861 to your computer and use it in GitHub Desktop.
def make_spooq_exporter(table, schema, task_id, dag):
return SpooqExportOperator(
jdbc_url=('jdbc:mysql://%s/%s?user=user&password=pasta'
% (TARGET_DB_HOST,TARGET_DB_NAME)),
target_table=table,
hive_table='%s.%s' % (schema, table),
dag=dag,
on_retry_callback=truncate_db,
task_id=task_id)
def truncate_db(context):
hook = MySqlHook('clean_db_export')
hook.run(
'truncate `%s`'%context['task_instance'].task.target_table,
autocommit=False,
parameters=None)
@yinleon
Copy link

yinleon commented Oct 5, 2016

Hi Marcin,

I liked your Ariflow write up on Medium, I have referenced it regularly while writing my first Airflow DAG.

I was curious about the context in truncate_db():
is it the task instance returned in make_spooq_exporter (visible as Attributes in Task Details in the web UI)?

I'm trying to reference an op_kwarg in my retry function.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment