Skip to content

Instantly share code, notes, and snippets.

@thuwarakeshm
Last active August 15, 2021 14:11
Show Gist options
  • Save thuwarakeshm/68b638119c100dc9c796904460d6afcb to your computer and use it in GitHub Desktop.
Save thuwarakeshm/68b638119c100dc9c796904460d6afcb to your computer and use it in GitHub Desktop.
ETL with Prefect
[context.secrets]
EMAIL_USERNAME = "<Your email id>"
EMAIL_PASSWORD = "<your email password>"
# Import email task
from prefect.tasks.notifications.email_task import EmailTask
...
# Create an email_task object. Use all static content here.
email_task = EmailTask(
subject="A new windspeed captured",
email_to="receiver_email@gmail.com",
email_from="your_email@gmail.com",
)
with Flow("Windspeed Tracker", schedule=schedule) as flow:
...
# Call the email task with variable content.
email_task(
msg=str(windspeed),
)
import requests
def extract() -> dict:
"""Use the Open Weather Map API to fetch Boston weather data.
Returns:
dict: a JSON response of many wheather measurements.
"""
url = "https://api.openweathermap.org/data/2.5/weather"
# TODO: Use a real API key. You can get a free one from https://openweathermap.org/
response = requests.request(
"GET", url, params={"q": "Boston", "appid": "e5ecbcc49e3debeead24d0fe012fb46e"}
)
return response.json()
def transform(response: dict) -> float:
"""Process the response and extract windspeed information
Args:
response (dict): Response JSON from extract task
Returns:
float: Current wind speed
"""
return response.get("wind", {}).get("speed", 0.0)
def load(speed: float):
"""Append data to file
Args:
speed (float): Windspeed from transform task
"""
with open("windspeed.txt", "a") as f:
f.write(str(speed) + "\n")
if __name__ == "__main__":
# Execute tasks in the right order.
response = extract()
windspeed = transform(response)
load(windspeed)
# Import parameters
from prefect import task, Flow, Parameter
# Tweak the function to accept city argument.
@task(max_retries=3, retry_delay=timedelta(minutes=3))
def extract(city: str) -> dict:
url = "https://api.openweathermap.org/data/2.5/weather"
# TODO: Use a real API key. You can get a free one from https://openweathermap.org/
response = requests.request(
"GET", url, params={"q": city, "appid": "e5ecbcc49e3debeead24d0fe012fb46e"}
)
return response.json()
...
with Flow("Windspeed Tracker", schedule=schedule) as flow:
# create a city parameter with the default value Boston and pass it to the extract task.
city = Parameter("city", default="Boston")
response = extract(city)
...
...
import requests
# Importing Prefect task, Flow and Python timdelta
from prefect import task, Flow
from datetime import timedelta
# decorater specifying how many times to retry and it's iterval.
@task(max_retries=3, retry_delay=timedelta(minutes=3))
def extract() -> dict:
...
@task
def transform(response: dict) -> float:
...
@task(max_retries=3, retry_delay=timedelta(5))
def load(speed: float):
...
# Create a Prefect flow
with Flow("Windspeed Tracker") as flow:
# Execute tasks in the right order.
response = extract()
windspeed = transform(response)
load(windspeed)
if __name__ == "__main__":
# Execute the flow
flow.run()
...
# Imports to facilitate Scheduling.
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule
...
# Create a schedule object
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=5),
interval=timedelta(minutes=1),
)
# Attach the schedule object to the windspeed trakcer flow.
with Flow("Windspeed Tracker", schedule=schedule) as flow:
response = extract()
windspeed = transform(response)
load(windspeed)
if __name__ == "__main__":
flow.register('Tutorial')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment