Skip to content

Instantly share code, notes, and snippets.

@taljaards
Created March 23, 2022 21:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save taljaards/a3d3dc74c010ec381f02b884687d7a42 to your computer and use it in GitHub Desktop.
Save taljaards/a3d3dc74c010ec381f02b884687d7a42 to your computer and use it in GitHub Desktop.
A simple sample flow to check for (new) job openings at Prefect and send links to them via Telegram
import datetime
import os
import urllib.parse
from collections import namedtuple
from typing import List
import pandas as pd
import requests
import telegram
from bs4 import BeautifulSoup
from prefect import flow, task
Job = namedtuple("Job", ["id", "category", "title", "url", "datetime"])
def create_folder(path):
if not os.path.exists(path):
os.makedirs(path)
def save_html(soup, file_name):
with open(f"{file_name}.html", "w") as file:
file.write(str(soup))
def open_html_soup(file_name):
return BeautifulSoup(open(f"{file_name}.html", "r"), "html.parser")
def safesoup(response, file_name):
temp_path = r"data\temp"
create_folder(temp_path)
file_path = os.path.join(temp_path, file_name)
save_html(response.text, file_path)
return open_html_soup(file_path)
base_url = "https://boards.greenhouse.io/prefect"
current_dt = datetime.datetime.now()
@task
def get_job_openings() -> List[Job]:
with requests.Session() as session:
# Navigate to site
response = session.get(base_url)
# Backup html just in case, and create soup
soup = safesoup(response, "greenhouse_prefect")
# List for storing all scraped values
jobs = []
openings = soup.select(".level-0 :nth-child(1)")
for opening in openings:
if opening.name == "h3":
# Job category
category = opening.text
continue
else:
# Job posting
job_title = opening.text
job_url = urllib.parse.urljoin(base_url, opening["href"])
job_id = job_url.split("/")[-1]
jobs.append(Job(job_id, category, job_title, job_url, current_dt))
print(f"Found {len(jobs)} job openings")
return jobs
@task
def jobs_to_df(jobs):
return pd.DataFrame(jobs).astype({"id": str}).set_index("id", drop=True)
@task
def save_to_df_csv(new_df):
file_name = r"data\job_openings.csv"
try:
original_df = pd.read_csv(file_name).astype({"id": str}).set_index("id", drop=True)
except FileNotFoundError:
original_df = pd.DataFrame(columns=Job._fields).set_index("id", drop=True)
combined_df = pd.concat([original_df, new_df])
new_rows = combined_df.reset_index().drop_duplicates(subset="id", keep=False).set_index("id")
print(f"New job openings: {len(new_rows)}")
if not new_rows.empty:
new_clean_df = combined_df.reset_index().drop_duplicates(subset="id", keep="first").set_index("id")
new_clean_df.to_csv(file_name)
return new_rows
@task
def msg_new_jobs(df):
token = os.environ["TELEGRAM_TOKEN"]
chat_id = os.environ["TELEGRAM_CHAT_ID"]
bot = telegram.Bot(token=token)
msg = ["*New Prefect job openings*"]
df["hyperlink"] = "[" + df["title"] + "](" + df["url"] + ")"
for cat, rows in df.drop(["datetime"], axis="columns").groupby("category"):
msg.append(f"_{cat}:_")
msg.extend(rows["hyperlink"].tolist())
msg = "\n".join(msg)
bot.send_message(chat_id=chat_id, text=msg, parse_mode="MarkDown")
@flow(name="Check for new job openings")
def check_job_openings():
jobs = get_job_openings()
jobs = jobs_to_df(jobs)
new_jobs_df = save_to_df_csv(jobs)
if not new_jobs_df.result().empty:
msg_new_jobs(new_jobs_df)
if __name__ == "__main__":
check_job_openings()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment