Skip to content

Instantly share code, notes, and snippets.

@alucarded
Last active December 4, 2020 19:09
Show Gist options
  • Save alucarded/99a948af5c20cbe7d1290b2ff61cadbf to your computer and use it in GitHub Desktop.
Save alucarded/99a948af5c20cbe7d1290b2ff61cadbf to your computer and use it in GitHub Desktop.
Airflow DAG definition file to dynamically generate DAGs based on a variable (pull economic data when it is released)
#/usr/bin/python3
# -*- coding: utf-8 -*-
import logging
import airflow
from airflow import DAG
from datetime import timedelta, datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import Variable
from airflow.contrib.hooks.mongo_hook import MongoHook
import datetime
import dateutil.parser
import json
import requests
import logging
import pytz
logging.basicConfig(level=logging.INFO)
log = logging.getLogger('events_extraction_agent')
ECONOMIC_EVENTS_API_HEADERS = Variable.get("economic_events_api_headers", deserialize_json = True)
DAG_DEFAULT_ARGS = {
'owner': 'Tom',
'depends_on_past': False,
'email': ['mail@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'provide_context': True
}
EXTRACT_TASK_ID = 'economic_event_extract_task'
TRANSFORM_TASK_ID = 'economic_event_transform_task'
LOAD_TASK_ID = 'economic_event_load_task'
def check_event_updated(response):
if not response.ok:
return False
event = response.json()[0]
# Fail task if data not yet available
if not event['actual']:
log.warning("Event data not updated. Failing task.")
return False
return True
def event_transform_task(**context):
event_response = context['ti'].xcom_pull(task_ids=EXTRACT_TASK_ID)
log.info("Event from API: {0}".format(event_response))
e = json.loads(event_response)[0]
e['_id'] = e['id']
del e['id']
return e
def event_load_task(**context):
event = context['ti'].xcom_pull(task_ids=TRANSFORM_TASK_ID)
# Parse string datetimes
event['dateUtc'] = dateutil.parser.parse(event['dateUtc'])
if event['periodDateUtc']:
event['periodDateUtc'] = dateutil.parser.parse(event['periodDateUtc'])
# Load to Mongo
mongo = MongoHook(conn_id="findata_mongo")
collection = mongo.get_collection('CalendarEventData')
res = collection.update_one({ '_id': event['_id'] }, { '$set': event })
log.info("Got update result: {0}".format(res.raw_result))
def add_event_update_dag(event):
event_date = dateutil.parser.parse(event['dateUtc'])
dag_id = "economic_event_update_{0}".format(event['eventId'])
desc = "Scheduling DAG at {2} for event {0} with ID {1}".format(event['name'], event['eventId'], event_date)
globals()[dag_id] = DAG(
dag_id,
default_args=DAG_DEFAULT_ARGS,
description=desc,
schedule_interval='@once',
start_date=event_date,
is_paused_upon_creation=False
)
log.info(desc)
endpoint = "/events/{0}/historical?take=1".format(event['eventId'])
globals()[EXTRACT_TASK_ID] = SimpleHttpOperator(task_id=EXTRACT_TASK_ID,
endpoint=endpoint,
method='GET',
headers=ECONOMIC_EVENTS_API_HEADERS,
response_check=check_event_updated,
xcom_push=True,
http_conn_id="economic_events_api",
dag=globals()[dag_id])
globals()[TRANSFORM_TASK_ID] = PythonOperator(task_id=TRANSFORM_TASK_ID,
python_callable=event_transform_task,
provide_context=True,
dag=globals()[dag_id])
globals()[LOAD_TASK_ID] = PythonOperator(task_id=LOAD_TASK_ID,
python_callable=event_load_task,
provide_context=True,
dag=globals()[dag_id])
globals()[EXTRACT_TASK_ID] >> globals()[TRANSFORM_TASK_ID] >> globals()[LOAD_TASK_ID]
# Dynamically generate DAGs
events = Variable.get("economic_events", deserialize_json = True, default_var = [])
for e in events:
add_event_update_dag(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment