Last active
December 15, 2020 22:30
-
-
Save Kamparia/fd201bc6ea87c3148ef9a5e022d0c65d to your computer and use it in GitHub Desktop.
Data Pipeline With Apache Airflow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
from datetime import datetime, timedelta | |
from airflow import DAG | |
from airflow.models import Variable | |
from airflow.operators.python_operator import PythonOperator | |
from airflow.operators.postgres_operator import PostgresOperator | |
from airflow.hooks.postgres_hook import PostgresHook | |
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator | |
def get_events_from_api(**context): | |
""" Returns from the API an array of events with magnitude greater than 5.0. """ | |
events = [] | |
response = requests.get(Variable.get("USGS_API_URL")) | |
for event in response.json().get("features", []): | |
timestamp = event["properties"]["time"]/1000 | |
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') | |
data = { | |
"event_id": event["id"], | |
"event_name": event["properties"]["title"], | |
"magnitude": float(event["properties"]["mag"]), | |
"longitude": float(event["geometry"]["coordinates"][0]), | |
"latitude": float(event["geometry"]["coordinates"][1]), | |
"date": date | |
} | |
if float(event["properties"]["mag"]) >= 5: | |
events.append(data) | |
# push events data to next task instance using XCom | |
context["ti"].xcom_push(key = "events", value = events) | |
def save_events_to_db(**context): | |
""" Inserts the events to database. """ | |
insert_query = """ | |
INSERT INTO public.earthquake_events (event_id, event_name, magnitude, longitude, latitude, date) | |
VALUES (%s, %s, %s, %s, %s, %s); | |
""" | |
# pull event data from previous task instance using XCom | |
events = context["ti"].xcom_pull(task_ids = 'get_new_events', key = 'events') | |
for event in events: | |
params = tuple(event.values()) | |
PostgresHook( | |
postgres_conn_id = "postgres_default", | |
schema = "usgs_eq" | |
).run(insert_query, parameters = params) | |
# dags default arguments | |
default_args = { | |
"owner": "olaoye.somide", | |
"start_date": datetime(2020, 12, 12), | |
'retries': 1, | |
'retry_delay': timedelta(minutes=5), | |
"schedule_interval": "@daily" | |
} | |
# instantiate aiflow dag | |
with DAG('usgs_harvester', default_args = default_args, schedule_interval = '0 8 * * *') as dag: | |
# Task 1: Create Postgres Table (if none exists). | |
task_one = PostgresOperator( | |
task_id = 'create_table', | |
sql = '''CREATE TABLE IF NOT EXISTS public.earthquake_events ( | |
event_id VARCHAR(50) NOT NULL, | |
event_name VARCHAR(160) NOT NULL, | |
magnitude DECIMAL NOT NULL, | |
longitude DECIMAL NOT NULL, | |
latitude DECIMAL NOT NULL, | |
date DATE NOT NULL);''', | |
postgres_conn_id = 'postgres_default', | |
database = Variable.get("USGS_DB_NAME") | |
) | |
# Task 2: Requests new events data from the USGS Earthquake API. | |
task_two = PythonOperator( | |
task_id = 'get_new_events', | |
python_callable = get_events_from_api, | |
provide_context = True | |
) | |
# Task 3: Store the new events data in Postgres. | |
task_three = PythonOperator( | |
task_id = 'save_events_to_db', | |
python_callable = save_events_to_db, | |
provide_context = True | |
) | |
# Task 4: Send Slack notifications to team members. | |
task_four = SlackWebhookOperator( | |
task_id = 'send_slack_notification', | |
http_conn_id = 'slack_conn', | |
webhook_token = Variable.get("SLACK_API_TOKEN"), | |
message = "New earthquake events was successfully harvested.", | |
username = 'airflow' | |
) | |
# execute pipeline tasks in series | |
task_one >> task_two >> task_three >> task_four |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment