Skip to content

Instantly share code, notes, and snippets.

@abhishekmishragithub
Forked from cicdw/post_standup_flow.py
Created September 1, 2021 09:08
Show Gist options
  • Save abhishekmishragithub/8a483291214a758b05327887ffdc7583 to your computer and use it in GitHub Desktop.
Save abhishekmishragithub/8a483291214a758b05327887ffdc7583 to your computer and use it in GitHub Desktop.
complete code of the standup Prefect flow
import datetime
from google.cloud.firestore import Client
import random
import requests
import prefect
from prefect import Flow, Parameter, task
from prefect.client import Secret
from prefect.schedules import CronSchedule
@task
def get_collection_name():
"""
Returns the current date, formatted, which maps
to a Google Firestore collection name.
"""
date_format = "%Y-%m-%d"
now = prefect.context["scheduled_start_time"]
return now.strftime(date_format)
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def get_latest_updates(date):
"""
Returns dictionary of team members -> their updates from a given date.
"""
client = Client(project="marvin-standup")
collection = client.collection(f"standup/{date}/users")
updates = collection.get()
user_dict = {doc.id: (doc.to_dict() or {}).get("updates") for doc in updates}
return user_dict
@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))
def post_standup(updates, channel):
"""
Given a dictionary of updates and a Slack Channel ID,
posts updates to the given channel as the Marvin user.
"""
public_msg = (
f"<!here> are today's standup updates:\n"
+ "=" * 30
)
items = list(updates.items())
random.shuffle(items)
for user, update in items:
public_msg += f"\n*{user}*: {update}"
TOKEN = Secret("MARVIN_TOKEN").get() # use a Prefect Secret instead
params = {
"token": TOKEN,
"as_user": "true",
"link_names": "true",
"mrkdwn": "true",
"channel": channel,
"text": public_msg,
}
r = requests.post("https://slack.com/api/chat.postMessage", data=params)
r.raise_for_status()
if r.json()["ok"] is False:
raise ValueError(r.json().get("error", "Slack error"))
return r
weekday_schedule = CronSchedule("0 14 * * 1-5")
with Flow(name="post-standup", schedule=weekday_schedule) as flow:
standup_channel = Parameter("standup_channel", default="XXXXXXXXX")
res = post_standup(get_latest_updates(get_collection_name()), standup_channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment