Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Data Pipeline With Apache Airflow
import requests
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
def get_events_from_api(**context):
""" Returns from the API an array of events with magnitude greater than 5.0. """
events = []
response = requests.get(Variable.get("USGS_API_URL"))
for event in response.json().get("features", []):
timestamp = event["properties"]["time"]/1000
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
data = {
"event_id": event["id"],
"event_name": event["properties"]["title"],
"magnitude": float(event["properties"]["mag"]),
"longitude": float(event["geometry"]["coordinates"][0]),
"latitude": float(event["geometry"]["coordinates"][1]),
"date": date
}
if float(event["properties"]["mag"]) >= 5:
events.append(data)
# push events data to next task instance using XCom
context["ti"].xcom_push(key = "events", value = events)
def save_events_to_db(**context):
""" Inserts the events to database. """
insert_query = """
INSERT INTO public.earthquake_events (event_id, event_name, magnitude, longitude, latitude, date)
VALUES (%s, %s, %s, %s, %s, %s);
"""
# pull event data from previous task instance using XCom
events = context["ti"].xcom_pull(task_ids = 'get_new_events', key = 'events')
for event in events:
params = tuple(event.values())
PostgresHook(
postgres_conn_id = "postgres_default",
schema = "usgs_eq"
).run(insert_query, parameters = params)
# dags default arguments
default_args = {
"owner": "olaoye.somide",
"start_date": datetime(2020, 12, 12),
'retries': 1,
'retry_delay': timedelta(minutes=5),
"schedule_interval": "@daily"
}
# instantiate aiflow dag
with DAG('usgs_harvester', default_args = default_args, schedule_interval = '0 8 * * *') as dag:
# Task 1: Create Postgres Table (if none exists).
task_one = PostgresOperator(
task_id = 'create_table',
sql = '''CREATE TABLE IF NOT EXISTS public.earthquake_events (
event_id VARCHAR(50) NOT NULL,
event_name VARCHAR(160) NOT NULL,
magnitude DECIMAL NOT NULL,
longitude DECIMAL NOT NULL,
latitude DECIMAL NOT NULL,
date DATE NOT NULL);''',
postgres_conn_id = 'postgres_default',
database = Variable.get("USGS_DB_NAME")
)
# Task 2: Requests new events data from the USGS Earthquake API.
task_two = PythonOperator(
task_id = 'get_new_events',
python_callable = get_events_from_api,
provide_context = True
)
# Task 3: Store the new events data in Postgres.
task_three = PythonOperator(
task_id = 'save_events_to_db',
python_callable = save_events_to_db,
provide_context = True
)
# Task 4: Send Slack notifications to team members.
task_four = SlackWebhookOperator(
task_id = 'send_slack_notification',
http_conn_id = 'slack_conn',
webhook_token = Variable.get("SLACK_API_TOKEN"),
message = "New earthquake events was successfully harvested.",
username = 'airflow'
)
# execute pipeline tasks in series
task_one >> task_two >> task_three >> task_four
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment