Task 3: Store the new events data in Postgres.
def save_events_to_db(**context): | |
""" Inserts the events to database. """ | |
insert_query = """ | |
INSERT INTO public.earthquake_events (event_id, event_name, magnitude, longitude, latitude, date) | |
VALUES (%s, %s, %s, %s, %s, %s); | |
""" | |
# pull event data from previous task instance using XCom | |
events = context["ti"].xcom_pull(task_ids='get_new_events', key='events') | |
for event in events: | |
params = tuple(event.values()) | |
PostgresHook( | |
postgres_conn_id="postgres_default", | |
schema="usgs_eq" | |
).run(insert_query, parameters=params) | |
# Task 3: Store the new events data in Postgres. | |
# The PythonOperator calls the Python Function defined above. | |
task_three = PythonOperator( | |
task_id = 'save_events_to_db', | |
python_callable = save_events_to_db, | |
provide_context = True | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment