Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Task 2: Requests new events data from the USGS Earthquake API.
def get_events_from_api(**context):
"""Returns from the API an array of events with magnitude greater than 5.0"""
events = []
response = requests.get(Variable.get("USGS_API_URL"))
for event in response.json().get("features", []):
timestamp = event["properties"]["time"]/1000
date = datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
data = {
"event_id": event["id"],
"event_name": event["properties"]["title"],
"magnitude": float(event["properties"]["mag"]),
"longitude": float(event["geometry"]["coordinates"][0]),
"latitude": float(event["geometry"]["coordinates"][1]),
"date": date
}
if float(event["properties"]["mag"]) >= 5:
events.append(data)
# push events data to next task instance using XCom
context["ti"].xcom_push(key="events", value=events)
# Task 2: Requests new events data from the USGS Earthquake API.
# The PythonOperator calls the Python Function defined above.
task_two = PythonOperator(
task_id = 'get_new_events',
python_callable = get_events_from_api,
provide_context = True
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment