Skip to content

Instantly share code, notes, and snippets.

@kmoonwright
Created August 11, 2020 17:14
Show Gist options
  • Save kmoonwright/22e99e9214e46868f790be9d61d9861b to your computer and use it in GitHub Desktop.
Save kmoonwright/22e99e9214e46868f790be9d61d9861b to your computer and use it in GitHub Desktop.
import datetime
import pendulum
import requests
from prefect import task, Flow, Parameter, case
from prefect.schedules import Schedule
from prefect.schedules.clocks import CronClock
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.notifications.slack_task import SlackTask
# Parameter Task value given at runtime
city = Parameter(name="City", default="Walnut Creek")
# PrefectSecret Task stored in Cloud UI
api_key = PrefectSecret("WEATHER_API_KEY")
# Extraction Task pulls 5-day, 3-hour forcast for the provided City
@task(max_retries=2, retry_delay=datetime.timedelta(seconds=5))
def pull_forecast(city, api_key):
base_url = "http://api.openweathermap.org/data/2.5/forecast?"
url = base_url + "appid=" + api_key + "&q=" + city
r = requests.get(url)
r.raise_for_status()
data = r.json()
return data
# Analysis Task determines whether there will be rain in forecast data
@task(tags=["analysis", "parser"])
def is_raining_this_week(data):
rain = [
forecast["rain"].get("3h", 0) for forecast in data["list"] if "rain" in forecast
]
return True if sum([s >= 1 for s in rain]) >= 1 else False
# SlackTask integration enables notifications to Slack through webhooks
rain_notification, dry_notification = SlackTask(
message="There is rain in the forecast for this week, watch out!",
webhook_secret="SLACK_WEBHOOK_URL_MHQ",
), SlackTask(
message="There will be NO rain this week, someone tell Toto!",
webhook_secret="SLACK_WEBHOOK_URL_MHQ",
)
# Define the Flow
with Flow("Rain Flow") as flow:
forecast = pull_forecast(city=city, api_key=api_key)
rain = is_raining_this_week(forecast)
with case(rain, True):
rain_notification()
with case(rain, False):
dry_notification()
# Register to Prefect Cloud in designated project
flow.register(project_name="Demos")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment