Skip to content

Instantly share code, notes, and snippets.

@cicdw
Created February 25, 2019 01:02
Show Gist options
  • Save cicdw/749da99b68ed0c09790705d54902091a to your computer and use it in GitHub Desktop.
Save cicdw/749da99b68ed0c09790705d54902091a to your computer and use it in GitHub Desktop.
complete code of the standup Prefect flow
import datetime
from google.cloud.firestore import Client
import random
import requests
import prefect
from prefect import Flow, Parameter, task
from prefect.client import Secret
from prefect.schedules import CronSchedule
@task
def get_collection_name():
"""
Returns the current date, formatted, which maps
to a Google Firestore collection name.
"""
date_format = "%Y-%m-%d"
now = prefect.context["scheduled_start_time"]
return now.strftime(date_format)
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def get_latest_updates(date):
"""
Returns dictionary of team members -> their updates from a given date.
"""
client = Client(project="marvin-standup")
collection = client.collection(f"standup/{date}/users")
updates = collection.get()
user_dict = {doc.id: (doc.to_dict() or {}).get("updates") for doc in updates}
return user_dict
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def post_standup(updates, channel):
"""
Given a dictionary of updates and a Slack Channel ID,
posts updates to the given channel as the Marvin user.
"""
public_msg = (
f"<!here> are today's standup updates:\n"
+ "=" * 30
)
items = list(updates.items())
random.shuffle(items)
for user, update in items:
public_msg += f"\n*{user}*: {update}"
TOKEN = Secret("MARVIN_TOKEN").get() # use a Prefect Secret instead
params = {
"token": TOKEN,
"as_user": "true",
"link_names": "true",
"mrkdwn": "true",
"channel": channel,
"text": public_msg,
}
r = requests.post("https://slack.com/api/chat.postMessage", data=params)
r.raise_for_status()
if r.json()["ok"] is False:
raise ValueError(r.json().get("error", "Slack error"))
return r
weekday_schedule = CronSchedule("0 14 * * 1-5")
with Flow(name="post-standup", schedule=weekday_schedule) as flow:
standup_channel = Parameter("standup_channel", default="XXXXXXXXX")
res = post_standup(get_latest_updates(get_collection_name()), standup_channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment