-
-
Save craffel/a1e2aff893776d0ef2b0a95ed0fd7e7a to your computer and use it in GitHub Desktop.
SE parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import bs4 | |
import collections | |
import datetime | |
import json | |
import math | |
import os | |
import sys | |
from xml.dom import minidom | |
import commonmark | |
import tqdm | |
parser = argparse.ArgumentParser("Parse a stack exchange site.") | |
parser.add_argument("--path", type=str, help="Path to a stack exchange dump.") | |
parser.add_argument("--site", type=str, help="Site name (defaults to path).") | |
parser.add_argument("--output", type=str, help="Where to write output.") | |
class Post: | |
def __init__(self, text): | |
self.text = text | |
class Question(Post): | |
def __init__(self, authors, text, comments, date): | |
self.authors = authors | |
self.comments = comments | |
self.date = date | |
self.answers = [] | |
super().__init__(text) | |
class Answer(Post): | |
def __init__(self, authors, text, comments): | |
self.authors = authors | |
self.comments = comments | |
super().__init__(text) | |
class Comment(Post): | |
def __init__(self, author, text): | |
self.author = author | |
super().__init__(text) | |
def get_doc(path): | |
return minidom.parse(path).getElementsByTagName("row") | |
def get_docs(path): | |
return ( | |
get_doc(os.path.join(path, "Posts.xml")), | |
get_doc(os.path.join(path, "Comments.xml")), | |
get_doc(os.path.join(path, "Users.xml")), | |
get_doc(os.path.join(path, "PostHistory.xml")), | |
) | |
def get_attr(xml_object, key): | |
if key in xml_object.attributes: | |
return xml_object.attributes[key].value | |
else: | |
return None | |
def get_html_text(html): | |
soup = bs4.BeautifulSoup(html, "html.parser") | |
return soup.get_text() | |
def get_body_text(xml_object): | |
return get_html_text(get_attr(xml_object, "Body")) | |
def simplify_date(date_string): | |
date = datetime.datetime.strptime( | |
date_string.split(".")[0], "%Y-%m-%dT%H:%M:%S" | |
) | |
return date.strftime("%Y/%m/%d") | |
def get_markdown_text(xml_object): | |
return get_html_text(commonmark.commonmark(get_attr(xml_object, "Text"))) | |
def split_id(post_id, max_id, digits): | |
max_digits = len(str(10**int(math.log10(max_id)))) | |
post_id = str(post_id).zfill(max_digits) | |
chunk_idx = range(math.ceil(max_digits / digits)) | |
chunks = [post_id[::-1][i*digits:i*digits + digits][::-1] for i in chunk_idx] | |
return os.path.join(*chunks[::-1]) | |
def parse_dump( | |
site, output_path, post_data, comment_data, user_data, history_data | |
): | |
author_display = collections.defaultdict(set) | |
for user in tqdm.tqdm(user_data): | |
if get_attr(user, "Id") != -1: | |
author_display[get_attr(user, "Id")].update( | |
{ | |
"/".join([site, "users", get_attr(user, "Id")]), | |
get_attr(user, "DisplayName"), | |
} | |
) | |
post_authors = collections.defaultdict(set) | |
for revision in tqdm.tqdm(history_data): | |
if get_attr(revision, "UserId") not in [-1, None]: | |
post_authors[get_attr(revision, "PostId")].update( | |
author_display[get_attr(revision, "UserId")] | |
) | |
comments = collections.defaultdict(list) | |
for comment in tqdm.tqdm(comment_data): | |
comments[get_attr(comment, "PostId")].append( | |
Comment( | |
author_display[get_attr(comment, "UserId")], | |
get_markdown_text(comment) | |
) | |
) | |
parsed_dump = {} | |
# Have to do two passes, one for questions and one for answers | |
for post in tqdm.tqdm(post_data): | |
if get_attr(post, "PostTypeId") == "1": | |
parsed_dump[get_attr(post, "Id")] = Question( | |
post_authors[get_attr(post, "Id")], | |
f"{get_attr(post, 'Title')}\n{get_body_text(post)}", | |
comments[get_attr(post, "Id")], | |
simplify_date(get_attr(post, "CreationDate")), | |
) | |
for post in tqdm.tqdm(post_data): | |
if get_attr(post, "PostTypeId") == "2": | |
parsed_dump[get_attr(post, "ParentId")].answers.append( | |
Answer( | |
post_authors[get_attr(post, "Id")], | |
get_body_text(post), | |
comments[get_attr(post, "Id")], | |
) | |
) | |
max_id = max([int(qid) for qid in parsed_dump.keys()]) | |
for question_id, question in tqdm.tqdm(parsed_dump.items()): | |
all_authors = set( | |
list(question.authors) + | |
[auth for ans in question.answers for auth in ans.authors] + | |
[ | |
n for c in question.comments if c.author is not None | |
for n in c.author | |
] + | |
[ | |
n for a in question.answers | |
for c in a.comments if c.author is not None | |
for n in c.author | |
] | |
) | |
metadata = { | |
"date": question.date, | |
"url": "/".join([site, "questions", question_id]), | |
"authors": list(all_authors), | |
"source": "Stack Exchange", | |
"license": "CC-BY-SA", | |
} | |
text = f"{question.text}\n" | |
text += "".join([f"{c.text}\n" for c in question.comments]) | |
for answer in question.answers: | |
text += f"{answer.text}\n" | |
for comment in answer.comments: | |
text += f"{comment.text}\n" | |
question_path = os.path.join( | |
output_path, split_id(question_id, max_id, 3), question_id | |
) | |
os.makedirs(question_path) | |
metadata_path = os.path.join(question_path, "metadata.json") | |
with open(metadata_path, "w") as f: | |
json.dump(metadata, f) | |
text_path = os.path.join(question_path, "text.txt") | |
with open(text_path, "w") as f: | |
f.write(text.encode("utf-8", "replace").decode()) | |
if __name__ == "__main__": | |
args = parser.parse_args() | |
if os.path.exists(args.output): | |
sys.exit() | |
parse_dump( | |
f"https://{args.site or os.path.split(args.path)[-1]}", | |
args.output, | |
*get_docs(args.path), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment