Skip to content

Instantly share code, notes, and snippets.

@Andrew-Chen-Wang
Last active May 21, 2022 05:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Andrew-Chen-Wang/7aad928efff20af1688de807fe053086 to your computer and use it in GitHub Desktop.
Save Andrew-Chen-Wang/7aad928efff20af1688de807fe053086 to your computer and use it in GitHub Desktop.
Download labor law posters required for sending to employees
#
# Copyright 2022 Andrew Chen Wang, Ur LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Each state has their own labor law posters, so we need
to download them and send them to our employees to abide by the law.
"""
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Tuple, Union
import aiofiles
from aiofiles import os
from aiohttp import ClientSession
from bs4 import BeautifulSoup
MAIN_STORAGE_DIRECTORY = Path(__file__).parent / "files"
headers = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.4 Safari/605.1.15 "
}
class ApiError(Exception):
pass
AttemptReturnT = Union[Tuple[str, bytes], ApiError]
async def _attempt_1(session, soup: BeautifulSoup) -> AttemptReturnT:
"""Attempts to download from the labor law website itself from its iframe"""
link = soup.select_one("object iframe").attrs["src"]
# Clean
if link.startswith("/"):
link = f"https://www.laborposters.org{link}"
elif link.startswith("https://docs.google.com/viewer?url="):
link = link.removeprefix("https://docs.google.com/viewer?url=")
# Run
async with session.get(link) as r:
if not r.ok:
return ApiError(f"[4b] Failed for {r.url}")
return link, await r.read()
async def _attempt_2(session, soup: BeautifulSoup) -> AttemptReturnT:
"""Attempts to find a valid original source link for the PDF"""
pdf_link = (
soup.find(text=lambda t: "Original poster PDF" in t)
.parent.select_one("a")
.attrs["href"]
)
# This single California link sends a 403, probably because of our
# HTTP agent: https://www.dfeh.ca.gov/wp-content/uploads/sites/32/2020/10/Workplace-Discrimination-Poster_ENG.pdf
async with session.get(pdf_link) as r:
if not r.ok:
return ApiError(f"[4b] Failed for {r.url}")
return pdf_link, await r.read()
async def get_pdf(session, state: str, text: str, original_url: str):
soup = BeautifulSoup(text, "html.parser")
errors = []
for i, x in enumerate([_attempt_1, _attempt_2]):
try:
response = await x(session, soup)
if isinstance(response, ApiError):
raise response
new_link, content = response
# Post-processing
file_name = new_link.split("/")[-1].split("&")[0].split("?")[0]
if "." not in file_name:
file_name = f"{file_name}.pdf"
file = str(MAIN_STORAGE_DIRECTORY.absolute() / state / file_name)
if await os.path.exists(file):
return
async with aiofiles.open(file, "wb") as f:
await f.write(content)
break
except BaseException as e:
errors.append(f"Attempt {i} for {original_url}. Error:\n{e}")
continue
else:
print(f"Errors for attempt:")
for e in errors:
print(e)
raise AssertionError(
f"[4] Couldn't identify valid poster link for {original_url}"
)
async def get_poster(session, state, link):
async with session.get(link) as r:
assert r.ok, f"[3] Failed for {r.url}"
content = await r.text()
await get_pdf(session, state, content, link)
async def main(session, state, link):
async with session.get(link) as r:
assert r.ok, f"[2] Failed for {link}"
posters = BeautifulSoup(await r.text(), "html.parser").select(
".tab-content .poster-name a"
)
poster_links = [a.attrs["href"] for a in posters]
tasks = []
for poster_link in poster_links:
tasks.append(
asyncio.create_task(get_poster(session, state, poster_link))
)
await asyncio.gather(*tasks)
async def begin():
async with ClientSession(headers=headers) as session:
async with session.get("https://www.laborposters.org/") as r:
assert r.ok, f"[1] Failed for {r.url}"
text = await r.text()
tbody = BeautifulSoup(text, "html.parser").select_one(".sf-al").parent.parent
links = [
(
tr.select_one("a").text,
tr.select("a")[0]["href"],
)
for tr in tbody.select("tr")
]
for state, _ in links:
pdf_path = MAIN_STORAGE_DIRECTORY / state
pdf_path.mkdir(parents=True, exist_ok=True)
await asyncio.gather(
*[main(session, state, link) for state, link in links]
)
if __name__ == "__main__":
print(f"Starting download at {datetime.now()}")
_loop = asyncio.get_event_loop()
_loop.run_until_complete(begin())
print(f"Finished download at {datetime.now()}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment