Skip to content

Instantly share code, notes, and snippets.

@kms70847
Created April 26, 2018 18:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kms70847/4bcf5cf65c177cc12ec215a5ad3cd5c9 to your computer and use it in GitHub Desktop.
Save kms70847/4bcf5cf65c177cc12ec215a5ad3cd5c9 to your computer and use it in GitHub Desktop.
Scrapes the star list from Stack Overflow's Python chat room and displays a ranking of most starred users
import requests
from bs4 import BeautifulSoup as Soup
import json
import re
import time
import datetime
import dateparser
from collections import defaultdict
import os
DUMP_NAME = "results.json"
#saves the results of the decorated function calls, even between executions of the script
def serialized(filename):
def decorator(fn):
def f(*args):
key = json.dumps(args)
try:
with open(filename) as file:
data = json.load(file)
except FileNotFoundError:
data = {}
if key not in data:
data[key] = fn(*args)
with open(filename, "w") as file:
json.dump(data, file)
return data[key]
return f
return decorator
@serialized("pages.json")
def get_page_text(page_number):
url = f"https://chat.stackoverflow.com/rooms/info/6/python/?tab=stars&page={page_number}"
data = requests.get(url).text
return data
def get_page(page_number):
return Soup(get_page_text(page_number), "html.parser")
def parse_timestamp(timestamp):
if timestamp.startswith("yst "):
return parse_timestamp(timestamp.replace("yst ", "", 1)) - datetime.timedelta(days=1)
result = dateparser.parse(timestamp)
if result is None:
raise Exception(f"Can't parse timestamp {repr(timestamp)}")
return result
def to_unix_epoch(my_datetime):
return time.mktime(my_datetime.timetuple())
def fetch_data():
first_page = get_page(1)
#subtract one since it's one-indexed
num_pages = max(int(x.text) for x in first_page.find_all(class_="page-numbers") if x.text.isdigit()) - 1
print(f"Number of pages: {num_pages}")
entries = []
for i in range(1, num_pages+1):
print(f"processing page {i} of {num_pages}.")
soup = get_page(i)
for raw_entry in soup.find_all(class_="monologue"):
username_section = raw_entry.find(class_="username")
username = username_section.text
try:
user_id = username_section.find("a").attrs["href"].split("/")[2]
except AttributeError: #deleted user.
user_id = None
message_id = raw_entry.find(class_="message").find("a").attrs["name"]
message = raw_entry.find(class_="content").decode_contents().strip()
raw_count = raw_entry.find(class_="times").text
count = int(raw_count) if raw_count else 1
raw_timestamp = raw_entry.find(class_="timestamp").text
timestamp = to_unix_epoch(parse_timestamp(raw_timestamp))
entries.append({
"username": username,
"user_id": user_id,
"message_id": message_id,
"timestamp": timestamp,
"message": message,
"count": count,
})
with open(DUMP_NAME, "w") as file:
json.dump(entries, file)
if not os.path.exists(DUMP_NAME):
fetch_data()
with open(DUMP_NAME) as file:
entries = json.load(file)
#two weeks ago
#earliest_date = to_unix_epoch(datetime.datetime.now() - datetime.timedelta(days=14))
#Jan 1, 1970
earliest_date = 0
entries = [entry for entry in entries if entry["timestamp"] > earliest_date]
users_by_id = {entry["user_id"]: entry["username"] for entry in entries}
stars_by_user = defaultdict(int)
for entry in entries:
stars_by_user[entry["user_id"]] += entry["count"]
for id, total in sorted(stars_by_user.items(), reverse=True, key=lambda t: t[1])[:10]:
print(f"{users_by_id[id]}: {total}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment