Skip to content

Instantly share code, notes, and snippets.

@craffel
Created September 12, 2022 01:35
Show Gist options
  • Save craffel/a1e2aff893776d0ef2b0a95ed0fd7e7a to your computer and use it in GitHub Desktop.
Save craffel/a1e2aff893776d0ef2b0a95ed0fd7e7a to your computer and use it in GitHub Desktop.
SE parser
import argparse
import bs4
import collections
import datetime
import json
import math
import os
import sys
from xml.dom import minidom
import commonmark
import tqdm
parser = argparse.ArgumentParser("Parse a stack exchange site.")
parser.add_argument("--path", type=str, help="Path to a stack exchange dump.")
parser.add_argument("--site", type=str, help="Site name (defaults to path).")
parser.add_argument("--output", type=str, help="Where to write output.")
class Post:
def __init__(self, text):
self.text = text
class Question(Post):
def __init__(self, authors, text, comments, date):
self.authors = authors
self.comments = comments
self.date = date
self.answers = []
super().__init__(text)
class Answer(Post):
def __init__(self, authors, text, comments):
self.authors = authors
self.comments = comments
super().__init__(text)
class Comment(Post):
def __init__(self, author, text):
self.author = author
super().__init__(text)
def get_doc(path):
return minidom.parse(path).getElementsByTagName("row")
def get_docs(path):
return (
get_doc(os.path.join(path, "Posts.xml")),
get_doc(os.path.join(path, "Comments.xml")),
get_doc(os.path.join(path, "Users.xml")),
get_doc(os.path.join(path, "PostHistory.xml")),
)
def get_attr(xml_object, key):
if key in xml_object.attributes:
return xml_object.attributes[key].value
else:
return None
def get_html_text(html):
soup = bs4.BeautifulSoup(html, "html.parser")
return soup.get_text()
def get_body_text(xml_object):
return get_html_text(get_attr(xml_object, "Body"))
def simplify_date(date_string):
date = datetime.datetime.strptime(
date_string.split(".")[0], "%Y-%m-%dT%H:%M:%S"
)
return date.strftime("%Y/%m/%d")
def get_markdown_text(xml_object):
return get_html_text(commonmark.commonmark(get_attr(xml_object, "Text")))
def split_id(post_id, max_id, digits):
max_digits = len(str(10**int(math.log10(max_id))))
post_id = str(post_id).zfill(max_digits)
chunk_idx = range(math.ceil(max_digits / digits))
chunks = [post_id[::-1][i*digits:i*digits + digits][::-1] for i in chunk_idx]
return os.path.join(*chunks[::-1])
def parse_dump(
site, output_path, post_data, comment_data, user_data, history_data
):
author_display = collections.defaultdict(set)
for user in tqdm.tqdm(user_data):
if get_attr(user, "Id") != -1:
author_display[get_attr(user, "Id")].update(
{
"/".join([site, "users", get_attr(user, "Id")]),
get_attr(user, "DisplayName"),
}
)
post_authors = collections.defaultdict(set)
for revision in tqdm.tqdm(history_data):
if get_attr(revision, "UserId") not in [-1, None]:
post_authors[get_attr(revision, "PostId")].update(
author_display[get_attr(revision, "UserId")]
)
comments = collections.defaultdict(list)
for comment in tqdm.tqdm(comment_data):
comments[get_attr(comment, "PostId")].append(
Comment(
author_display[get_attr(comment, "UserId")],
get_markdown_text(comment)
)
)
parsed_dump = {}
# Have to do two passes, one for questions and one for answers
for post in tqdm.tqdm(post_data):
if get_attr(post, "PostTypeId") == "1":
parsed_dump[get_attr(post, "Id")] = Question(
post_authors[get_attr(post, "Id")],
f"{get_attr(post, 'Title')}\n{get_body_text(post)}",
comments[get_attr(post, "Id")],
simplify_date(get_attr(post, "CreationDate")),
)
for post in tqdm.tqdm(post_data):
if get_attr(post, "PostTypeId") == "2":
parsed_dump[get_attr(post, "ParentId")].answers.append(
Answer(
post_authors[get_attr(post, "Id")],
get_body_text(post),
comments[get_attr(post, "Id")],
)
)
max_id = max([int(qid) for qid in parsed_dump.keys()])
for question_id, question in tqdm.tqdm(parsed_dump.items()):
all_authors = set(
list(question.authors) +
[auth for ans in question.answers for auth in ans.authors] +
[
n for c in question.comments if c.author is not None
for n in c.author
] +
[
n for a in question.answers
for c in a.comments if c.author is not None
for n in c.author
]
)
metadata = {
"date": question.date,
"url": "/".join([site, "questions", question_id]),
"authors": list(all_authors),
"source": "Stack Exchange",
"license": "CC-BY-SA",
}
text = f"{question.text}\n"
text += "".join([f"{c.text}\n" for c in question.comments])
for answer in question.answers:
text += f"{answer.text}\n"
for comment in answer.comments:
text += f"{comment.text}\n"
question_path = os.path.join(
output_path, split_id(question_id, max_id, 3), question_id
)
os.makedirs(question_path)
metadata_path = os.path.join(question_path, "metadata.json")
with open(metadata_path, "w") as f:
json.dump(metadata, f)
text_path = os.path.join(question_path, "text.txt")
with open(text_path, "w") as f:
f.write(text.encode("utf-8", "replace").decode())
if __name__ == "__main__":
args = parser.parse_args()
if os.path.exists(args.output):
sys.exit()
parse_dump(
f"https://{args.site or os.path.split(args.path)[-1]}",
args.output,
*get_docs(args.path),
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment