Skip to content

Instantly share code, notes, and snippets.

@vitorbaptista
Created April 25, 2017 19:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vitorbaptista/640db30fd77a18bec1a2223ca92a50a1 to your computer and use it in GitHub Desktop.
Save vitorbaptista/640db30fd77a18bec1a2223ca92a50a1 to your computer and use it in GitHub Desktop.
Simplest DAG for OpenTrials's Airflow
from datetime import datetime
import airflow.models
from airflow.operators.latest_only_operator import LatestOnlyOperator
import utils.helpers as helpers
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 4, 1),
'retries': 1,
# We could/should also set the "email" here, to be alerted
# when something goes wrong with this DAG
}
dag = airflow.models.DAG(
dag_id='guide',
default_args=args,
max_active_runs=1,
schedule_interval='@monthly'
)
# Most of our collectors/processors extract all data when run,
# so if there's more than 1 DAG run waiting to execute, we only
# need to run the latest one.
latest_only_task = LatestOnlyOperator(
task_id='latest_only',
dag=dag,
)
# Creates collector and processor tasks, setting the default
# environment variables, Docker command and image.
collector_task = helpers.create_collector_task(
name='guide',
dag=dag,
)
processor_task = helpers.create_processor_task(
name='guide',
dag=dag
)
# If you're processing new trials, trigger the
# "merge_identifiers_and_reindex" DAG to finish the trial processing
# common to all our trial processors.
merge_identifiers_and_reindex_task = helpers.create_trigger_subdag_task(
trigger_dag_id='merge_identifiers_and_reindex',
dag=dag
)
# Set the dependency order
collector_task.set_upstream(latest_only_task)
processor_task.set_upstream(collector_task)
merge_identifiers_and_reindex_task.set_upstream(processor_task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment