Skip to content

Instantly share code, notes, and snippets.

Last active August 15, 2021 14:11
Show Gist options
  • Save thuwarakeshm/68b638119c100dc9c796904460d6afcb to your computer and use it in GitHub Desktop.
Save thuwarakeshm/68b638119c100dc9c796904460d6afcb to your computer and use it in GitHub Desktop.
ETL with Prefect
EMAIL_USERNAME = "<Your email id>"
EMAIL_PASSWORD = "<your email password>"
# Import email task
from prefect.tasks.notifications.email_task import EmailTask
# Create an email_task object. Use all static content here.
email_task = EmailTask(
subject="A new windspeed captured",
with Flow("Windspeed Tracker", schedule=schedule) as flow:
# Call the email task with variable content.
import requests
def extract() -> dict:
"""Use the Open Weather Map API to fetch Boston weather data.
dict: a JSON response of many wheather measurements.
url = ""
# TODO: Use a real API key. You can get a free one from
response = requests.request(
"GET", url, params={"q": "Boston", "appid": "e5ecbcc49e3debeead24d0fe012fb46e"}
return response.json()
def transform(response: dict) -> float:
"""Process the response and extract windspeed information
response (dict): Response JSON from extract task
float: Current wind speed
return response.get("wind", {}).get("speed", 0.0)
def load(speed: float):
"""Append data to file
speed (float): Windspeed from transform task
with open("windspeed.txt", "a") as f:
f.write(str(speed) + "\n")
if __name__ == "__main__":
# Execute tasks in the right order.
response = extract()
windspeed = transform(response)
# Import parameters
from prefect import task, Flow, Parameter
# Tweak the function to accept city argument.
@task(max_retries=3, retry_delay=timedelta(minutes=3))
def extract(city: str) -> dict:
url = ""
# TODO: Use a real API key. You can get a free one from
response = requests.request(
"GET", url, params={"q": city, "appid": "e5ecbcc49e3debeead24d0fe012fb46e"}
return response.json()
with Flow("Windspeed Tracker", schedule=schedule) as flow:
# create a city parameter with the default value Boston and pass it to the extract task.
city = Parameter("city", default="Boston")
response = extract(city)
import requests
# Importing Prefect task, Flow and Python timdelta
from prefect import task, Flow
from datetime import timedelta
# decorater specifying how many times to retry and it's iterval.
@task(max_retries=3, retry_delay=timedelta(minutes=3))
def extract() -> dict:
def transform(response: dict) -> float:
@task(max_retries=3, retry_delay=timedelta(5))
def load(speed: float):
# Create a Prefect flow
with Flow("Windspeed Tracker") as flow:
# Execute tasks in the right order.
response = extract()
windspeed = transform(response)
if __name__ == "__main__":
# Execute the flow
# Imports to facilitate Scheduling.
from datetime import timedelta, datetime
from prefect.schedules import IntervalSchedule
# Create a schedule object
schedule = IntervalSchedule(
start_date=datetime.utcnow() + timedelta(seconds=5),
# Attach the schedule object to the windspeed trakcer flow.
with Flow("Windspeed Tracker", schedule=schedule) as flow:
response = extract()
windspeed = transform(response)
if __name__ == "__main__":
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment