Skip to content

Instantly share code, notes, and snippets.

@Kamparia
Created December 15, 2020 08:02
Show Gist options
  • Save Kamparia/d564a11159f9353c7576c3d426af213b to your computer and use it in GitHub Desktop.
Save Kamparia/d564a11159f9353c7576c3d426af213b to your computer and use it in GitHub Desktop.
Task 3: Store the new events data in Postgres.
def save_events_to_db(**context):
""" Inserts the events to database. """
insert_query = """
INSERT INTO public.earthquake_events (event_id, event_name, magnitude, longitude, latitude, date)
VALUES (%s, %s, %s, %s, %s, %s);
"""
# pull event data from previous task instance using XCom
events = context["ti"].xcom_pull(task_ids='get_new_events', key='events')
for event in events:
params = tuple(event.values())
PostgresHook(
postgres_conn_id="postgres_default",
schema="usgs_eq"
).run(insert_query, parameters=params)
# Task 3: Store the new events data in Postgres.
# The PythonOperator calls the Python Function defined above.
task_three = PythonOperator(
task_id = 'save_events_to_db',
python_callable = save_events_to_db,
provide_context = True
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment