Skip to content

Instantly share code, notes, and snippets.

@pombredanne
Forked from cicdw/post_standup_flow.py
Created December 2, 2019 00:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pombredanne/4b43b7fb4a508ee6a8108d4a2364ce0a to your computer and use it in GitHub Desktop.
Save pombredanne/4b43b7fb4a508ee6a8108d4a2364ce0a to your computer and use it in GitHub Desktop.
complete code of the standup Prefect flow
import datetime
from google.cloud.firestore import Client
import random
import requests
import prefect
from prefect import Flow, Parameter, task
from prefect.client import Secret
from prefect.schedules import CronSchedule
@task
def get_collection_name():
"""
Returns the current date, formatted, which maps
to a Google Firestore collection name.
"""
date_format = "%Y-%m-%d"
now = prefect.context["scheduled_start_time"]
return now.strftime(date_format)
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def get_latest_updates(date):
"""
Returns dictionary of team members -> their updates from a given date.
"""
client = Client(project="marvin-standup")
collection = client.collection(f"standup/{date}/users")
updates = collection.get()
user_dict = {doc.id: (doc.to_dict() or {}).get("updates") for doc in updates}
return user_dict
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def post_standup(updates, channel):
"""
Given a dictionary of updates and a Slack Channel ID,
posts updates to the given channel as the Marvin user.
"""
public_msg = (
f"<!here> are today's standup updates:\n"
+ "=" * 30
)
items = list(updates.items())
random.shuffle(items)
for user, update in items:
public_msg += f"\n*{user}*: {update}"
TOKEN = Secret("MARVIN_TOKEN").get() # use a Prefect Secret instead
params = {
"token": TOKEN,
"as_user": "true",
"link_names": "true",
"mrkdwn": "true",
"channel": channel,
"text": public_msg,
}
r = requests.post("https://slack.com/api/chat.postMessage", data=params)
r.raise_for_status()
if r.json()["ok"] is False:
raise ValueError(r.json().get("error", "Slack error"))
return r
weekday_schedule = CronSchedule("0 14 * * 1-5")
with Flow(name="post-standup", schedule=weekday_schedule) as flow:
standup_channel = Parameter("standup_channel", default="XXXXXXXXX")
res = post_standup(get_latest_updates(get_collection_name()), standup_channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment