Skip to content

Instantly share code, notes, and snippets.

@geospiza-fortis
Last active May 16, 2021 22:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save geospiza-fortis/1db40d17b20a8a5d5fd63212ab704399 to your computer and use it in GitHub Desktop.
Save geospiza-fortis/1db40d17b20a8a5d5fd63212ab704399 to your computer and use it in GitHub Desktop.
Scripts for scraping and summarization
C:\"Program Files (x86)"\Google\Chrome\Application\chrome.exe `
--remote-debugging-port=9222
from selenium import webdriver
import sys
from pathlib import Path
from tqdm.auto import tqdm
try:
url = sys.argv[1]
except:
raise ValueError("must provide url to thread")
output = Path(__file__).parent.parent / "data" / "html"
options = webdriver.ChromeOptions()
options.add_experimental_option("debuggerAddress", "127.0.0.1:9222")
driver = webdriver.Chrome(options=options)
print("loaded driver")
driver.get(url)
el = driver.find_element_by_xpath("//div[contains(@class, 'PageNav')]")
page = int(el.get_attribute("data-page"))
total = int(el.get_attribute("data-last"))
if page != 1:
raise ValueError(f"URL not a first page of a thread: {url}")
name = driver.current_url.rstrip("/").split("/")[-1]
(output / name).mkdir(parents=True, exist_ok=True)
for page in tqdm(range(total)):
with (output / name / f"page-{page+1:02d}.html").open("wb") as fp:
fp.write(driver.page_source.encode())
if page == total:
break
new_url = "/".join(driver.current_url.split("/")[:-1] + [f"page-{page+1}"])
driver.get(new_url)
print("done")
from datetime import datetime
from copy import copy
import pandas as pd
def extract_data(post):
article=copy(post.find("article"))
references=[a["href"].split("#")[-1] for a in article.find_all("a", class_="AttributionLink")]
for e in article.find_all("div", class_="bbCodeQuote"):
e.decompose()
time_el = post.find("span", class_="DateTime")
if time_el:
time = time_el["title"]
else:
try:
time_el = post.find("abbr", class_="DateTime")
time = f'{time_el["data-datestring"]} at {time_el["data-timestring"]}'
except:
print(post)
raise ValueError()
return dict(
author=post["data-author"],
post_id=post["id"],
timestamp=datetime.strptime(time, "%b %d, %Y at %I:%M %p").isoformat(),
references=references,
body=article.getText().strip()
)
def parse_html(path):
data = []
for page in sorted(Path(path).glob("*.html")):
text = page.read_text(encoding="utf-8")
soup = BeautifulSoup(text)
posts = soup.find(id="messageList").find_all("li", class_="message")
data += [extract_data(p) for p in posts]
return pd.DataFrame(data)
for path in sorted(Path("../data/html").glob("*")):
print(f"parsing {path.name}")
df = parse_html(path)
df["thread_id"] = path.name
output = Path("../data/parsed")
output.mkdir(parents=True, exist_ok=True)
df.to_json(str(output / f"{path.name}.json"), orient="records")
import pandas as pd
from transformers import pipeline
from tqdm.auto import tqdm
from pathlib import Path
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device=0)
def token_slide(text, k=800, window=700):
split = text.split()
splits = []
k = 800
window=700
for i in range(len(split)//window+1):
off = i*window
splits.append(" ".join(split[off:off+k]))
return splits
def summarize(chunks, k=6, max_length=100):
res = []
chunked = [chunks[i:i+k] for i in range(0, len(chunks), k)]
for s in tqdm(chunked):
summed = summarizer(s, truncation=True, max_length=max_length, min_length=4)
res += [x["summary_text"] for x in summed]
return res
def concat(chunks, k=8):
return [" ".join(chunks[i:i+k]) for i in range(0, len(chunks), k)]
def process(df):
total = df.body.apply(lambda x: len([t for t in x.split() if t])).sum()
text = "\n".join(df.body.values)
windows = token_slide(text)
pass_0 = summarize(windows, max_length=100)
# concat outputs of max length 100 so each one is 800
pass_1 = summarize(concat(pass_0))
return dict(
medium=pass_0,
short=pass_1,
total_len=total,
medium_len=len(" ".join(pass_0).split()),
short_len=len(" ".join(pass_1).split())
)
def write(name, res, path):
short = "\n\n".join(res["short"])
medium = "\n\n".join(res["medium"])
total_len = res["total_len"]
text = f"""
## {name}
thread: https://forum.maplelegends.com/index.php?threads/{name}
- original: {total_len} words
- medium: {res["medium_len"]} words ({res["medium_len"]/total_len*100:.1f}% of original)
- short: {res["short_len"]} words ({res["short_len"]/total_len*100:.1f}% of original)
### short summary
{short}
### medium summary
{medium}
""".strip() + "\n"
with open(path, "w") as fp:
fp.write(text)
root = Path("../data/parsed")
output = Path("../data/summarized")
output.mkdir(parents=True, exist_ok=True)
for path in sorted(root.glob("*.json")):
print(f"processing {path}")
df = pd.read_json(path)
res = process(df)
write(path.name.split(".json")[0], res, (output / path.name.replace(".json", ".md")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment