Created
March 23, 2022 21:56
-
-
Save taljaards/a3d3dc74c010ec381f02b884687d7a42 to your computer and use it in GitHub Desktop.
A simple sample flow to check for (new) job openings at Prefect and send links to them via Telegram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import os | |
import urllib.parse | |
from collections import namedtuple | |
from typing import List | |
import pandas as pd | |
import requests | |
import telegram | |
from bs4 import BeautifulSoup | |
from prefect import flow, task | |
Job = namedtuple("Job", ["id", "category", "title", "url", "datetime"]) | |
def create_folder(path): | |
if not os.path.exists(path): | |
os.makedirs(path) | |
def save_html(soup, file_name): | |
with open(f"{file_name}.html", "w") as file: | |
file.write(str(soup)) | |
def open_html_soup(file_name): | |
return BeautifulSoup(open(f"{file_name}.html", "r"), "html.parser") | |
def safesoup(response, file_name): | |
temp_path = r"data\temp" | |
create_folder(temp_path) | |
file_path = os.path.join(temp_path, file_name) | |
save_html(response.text, file_path) | |
return open_html_soup(file_path) | |
base_url = "https://boards.greenhouse.io/prefect" | |
current_dt = datetime.datetime.now() | |
@task | |
def get_job_openings() -> List[Job]: | |
with requests.Session() as session: | |
# Navigate to site | |
response = session.get(base_url) | |
# Backup html just in case, and create soup | |
soup = safesoup(response, "greenhouse_prefect") | |
# List for storing all scraped values | |
jobs = [] | |
openings = soup.select(".level-0 :nth-child(1)") | |
for opening in openings: | |
if opening.name == "h3": | |
# Job category | |
category = opening.text | |
continue | |
else: | |
# Job posting | |
job_title = opening.text | |
job_url = urllib.parse.urljoin(base_url, opening["href"]) | |
job_id = job_url.split("/")[-1] | |
jobs.append(Job(job_id, category, job_title, job_url, current_dt)) | |
print(f"Found {len(jobs)} job openings") | |
return jobs | |
@task | |
def jobs_to_df(jobs): | |
return pd.DataFrame(jobs).astype({"id": str}).set_index("id", drop=True) | |
@task | |
def save_to_df_csv(new_df): | |
file_name = r"data\job_openings.csv" | |
try: | |
original_df = pd.read_csv(file_name).astype({"id": str}).set_index("id", drop=True) | |
except FileNotFoundError: | |
original_df = pd.DataFrame(columns=Job._fields).set_index("id", drop=True) | |
combined_df = pd.concat([original_df, new_df]) | |
new_rows = combined_df.reset_index().drop_duplicates(subset="id", keep=False).set_index("id") | |
print(f"New job openings: {len(new_rows)}") | |
if not new_rows.empty: | |
new_clean_df = combined_df.reset_index().drop_duplicates(subset="id", keep="first").set_index("id") | |
new_clean_df.to_csv(file_name) | |
return new_rows | |
@task | |
def msg_new_jobs(df): | |
token = os.environ["TELEGRAM_TOKEN"] | |
chat_id = os.environ["TELEGRAM_CHAT_ID"] | |
bot = telegram.Bot(token=token) | |
msg = ["*New Prefect job openings*"] | |
df["hyperlink"] = "[" + df["title"] + "](" + df["url"] + ")" | |
for cat, rows in df.drop(["datetime"], axis="columns").groupby("category"): | |
msg.append(f"_{cat}:_") | |
msg.extend(rows["hyperlink"].tolist()) | |
msg = "\n".join(msg) | |
bot.send_message(chat_id=chat_id, text=msg, parse_mode="MarkDown") | |
@flow(name="Check for new job openings") | |
def check_job_openings(): | |
jobs = get_job_openings() | |
jobs = jobs_to_df(jobs) | |
new_jobs_df = save_to_df_csv(jobs) | |
if not new_jobs_df.result().empty: | |
msg_new_jobs(new_jobs_df) | |
if __name__ == "__main__": | |
check_job_openings() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment