Skip to content

Instantly share code, notes, and snippets.

@itajaja
Created May 10, 2016 15:06
Show Gist options
  • Save itajaja/cd868a2869c74ce7e5e3a8a1a918bde9 to your computer and use it in GitHub Desktop.
Save itajaja/cd868a2869c74ce7e5e3a8a1a918bde9 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.operators import EcsOperator, PythonOperator
from datetime import datetime
import os
from utils import get_last_run_date
# -----------------------------------------------------------------------------
ECS_CLUSTER = os.environ['XXXX_AIRFLOW_ECS_CLUSTER']
AWS_ACCESS_KEY_ID = os.environ['XXXX_AIRFLOW_AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = os.environ['XXXX_AIRFLOW_AWS_SECRET_ACCESS_KEY']
ENVIRONMENT = (
{
'name': 'XXXX_AIRFLOW_AWS_ACCESS_KEY_ID',
'value': AWS_ACCESS_KEY_ID,
},
{
'name': 'XXXX_AIRFLOW_AWS_SECRET_ACCESS_KEY',
'value': AWS_SECRET_ACCESS_KEY,
},
{
'name': 'XXXX_AIRFLOW_S3_BUCKET_NAME',
'value': os.environ['XXXX_AIRFLOW_S3_BUCKET_NAME'],
},
)
FINISHED_RUNS_FILENAME = (
'dags/process_finished_runs/'
'finished_runs_{{ ti.xcom_pull(task_ids="last_run_date") }}'
)
# -----------------------------------------------------------------------------
args = {
'owner': 'XXX',
'start_date': datetime.today(),
}
dag = DAG(
dag_id='process_finished_runs',
default_args=args,
schedule_interval='@hourly'
)
# -----------------------------------------------------------------------------
last_run_date = PythonOperator(
task_id='last_run_date',
python_callable=get_last_run_date,
op_args=('process_finished_runs',),
dag=dag,
)
process_finished_runs_items = EcsOperator(
task_id='process_finished_runs_items',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
cluster=ECS_CLUSTER,
task_definition='XXXXXXXX',
overrides={
'containerOverrides': [
{
'name': 'XXXXXXXX',
'command': [
'process_finished_runs',
FINISHED_RUNS_FILENAME,
'{{ ti.xcom_pull(task_ids="last_run_date") }}'
],
'environment': ENVIRONMENT,
},
],
},
dag=dag,
)
process_finished_runs_data = EcsOperator(
task_id='process_finished_runs_data',
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
cluster=ECS_CLUSTER,
task_definition='XXXXXXXXX',
overrides={
'containerOverrides': [
{
'name': 'XXXXXXXX',
'command': [
'process_finished_runs_data',
FINISHED_RUNS_FILENAME,
],
'environment': ENVIRONMENT,
},
],
},
dag=dag,
)
# -----------------------------------------------------------------------------
last_run_date.set_downstream(process_finished_runs_items)
process_finished_runs_items.set_downstream(
process_finished_runs_data
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment