Created
August 11, 2020 17:14
-
-
Save kmoonwright/22e99e9214e46868f790be9d61d9861b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pendulum | |
import requests | |
from prefect import task, Flow, Parameter, case | |
from prefect.schedules import Schedule | |
from prefect.schedules.clocks import CronClock | |
from prefect.tasks.secrets import PrefectSecret | |
from prefect.tasks.notifications.slack_task import SlackTask | |
# Parameter Task value given at runtime | |
city = Parameter(name="City", default="Walnut Creek") | |
# PrefectSecret Task stored in Cloud UI | |
api_key = PrefectSecret("WEATHER_API_KEY") | |
# Extraction Task pulls 5-day, 3-hour forcast for the provided City | |
@task(max_retries=2, retry_delay=datetime.timedelta(seconds=5)) | |
def pull_forecast(city, api_key): | |
base_url = "http://api.openweathermap.org/data/2.5/forecast?" | |
url = base_url + "appid=" + api_key + "&q=" + city | |
r = requests.get(url) | |
r.raise_for_status() | |
data = r.json() | |
return data | |
# Analysis Task determines whether there will be rain in forecast data | |
@task(tags=["analysis", "parser"]) | |
def is_raining_this_week(data): | |
rain = [ | |
forecast["rain"].get("3h", 0) for forecast in data["list"] if "rain" in forecast | |
] | |
return True if sum([s >= 1 for s in rain]) >= 1 else False | |
# SlackTask integration enables notifications to Slack through webhooks | |
rain_notification, dry_notification = SlackTask( | |
message="There is rain in the forecast for this week, watch out!", | |
webhook_secret="SLACK_WEBHOOK_URL_MHQ", | |
), SlackTask( | |
message="There will be NO rain this week, someone tell Toto!", | |
webhook_secret="SLACK_WEBHOOK_URL_MHQ", | |
) | |
# Define the Flow | |
with Flow("Rain Flow") as flow: | |
forecast = pull_forecast(city=city, api_key=api_key) | |
rain = is_raining_this_week(forecast) | |
with case(rain, True): | |
rain_notification() | |
with case(rain, False): | |
dry_notification() | |
# Register to Prefect Cloud in designated project | |
flow.register(project_name="Demos") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment