Skip to content

Instantly share code, notes, and snippets.

@robert8138
Last active December 26, 2019 14:50
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save robert8138/51d42b6b07a92fcd6bc38d138936bce1 to your computer and use it in GitHub Desktop.
Save robert8138/51d42b6b07a92fcd6bc38d138936bce1 to your computer and use it in GitHub Desktop.
A toy example of a DAG definition file in Airflow
"""
A DAG docstring might be a good way to explain at a high level
what problem space the DAG is looking at.
Links to design documents, upstream dependencies etc
are highly recommended.
"""
from datetime import datetime, timedelta
from airflow.models import DAG # Import the DAG class
from airflow.operators.sensors import NamedHivePartitionSensor
from airflow.operators.hive_operator import HiveOperator
### You can import more operators as you see fit!
# from airflow.operators.bash_operator import BashOperator
# from airflow.operators.python_operator import PythonOperator
# setting some default arguments for the DAG
default_args = {
'owner': 'you',
'depends_on_past': False,
'start_date': datetime(2018, 2, 9),
}
# Instantiate the Airflow DAG
dag = DAG(
dag_id='anatomy_of_a_dag',
description="This describes my DAG",
default_args=default_args,
schedule_interval=timedelta(days=1)) # This is a daily DAG.
# Put upstream dependencies in a dictionary
wf_dependencies = {
'wf_upstream_table_1': 'upstream_table_1/ds={{ ds }}',
'wf_upstream_table_2': 'upstream_table_2/ds={{ ds }}',
'wf_upstream_table_3': 'upstream_table_3/ds={{ ds }}',
}
# Define the sensors for upstream dependencies
for wf_task_id, partition_name in wf_dependencies.iteritems():
NamedHivePartitionSensor(
task_id=wf_task_id,
partition_names=[partition_name],
dag=dag
)
# Put the tasks in a list
tasks = [
('hql', 'task_1'),
('hql', 'task_2'),
]
# Define the operators in the list above
for directory, task_name in tasks:
HiveOperator(
task_id=task_name,
hql='{0}/{1}.hql'.format(directory, task_name),
dag=dag,
)
# Put the dependencies in a map
deps = {
'task_1': [
'wf_upstream_table_1',
'wf_upstream_table_2',
],
'task_2': [
'wf_upstream_table_1',
'wf_upstream_table_2',
'wf_upstream_table_3',
],
}
# Explicitly define the dependencies in the DAG
for downstream, upstream_list in deps.iteritems():
for upstream in upstream_list:
dag.set_dependency(upstream, downstream)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment