Skip to content

Instantly share code, notes, and snippets.

@yssharma
Last active July 14, 2017 04:04
Show Gist options
  • Save yssharma/644789c424a0714bda91d02ed8d4d952 to your computer and use it in GitHub Desktop.
Save yssharma/644789c424a0714bda91d02ed8d4d952 to your computer and use it in GitHub Desktop.
Airflow dag for range runs
# Here we look if there were any
# _MANUAL_OVERRIDE_START_DATE or _MANUAL_OVERRIDE_END_DATE passed
# in config, else, it falls back to yesterday
# for daily runs.
START = "{{dag_run.conf.get('_MANUAL_OVERRIDE_START_DATE', macros.ds_add(ds, -1)) if dag_run.conf else macros.ds_add(ds, -1)}}"
END = "{{dag_run.conf.get('_MANUAL_OVERRIDE_END_DATE', ds) if dag_run.conf else ds}}"
query = """
select
day,
unix_time,
username,
product,
attribute
from huge_table
where attribute like '%interesting%'
and day >= '{START}' and day <= '{END}'
""".format(START=START, END=END)
dag = airflow.DAG("InterestingData", schedule_interval="10 * * * *", max_active_runs=1)
# This is my HiveOperator.
# Should be fine if you use
# you custom operator for hive/spark
hive_operator = airflow.operators.HiveOperator(
task_id='get_interesting_data',
hql=query,
hive_conn_id='emr_521_hive',
dag=dag)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment