Skip to content

Instantly share code, notes, and snippets.

@snydergd
Created January 18, 2022 23:28
Show Gist options
  • Save snydergd/ceb3a229e1b8d81dc1ecf50e6d5640dd to your computer and use it in GitHub Desktop.
Save snydergd/ceb3a229e1b8d81dc1ecf50e6d5640dd to your computer and use it in GitHub Desktop.
StackOverflow Teams Reward Distribution Script
#!/bin/env python3
import argparse
import requests
import os
import json
from datetime import datetime, timedelta
import random
parser = argparse.ArgumentParser()
parser.add_argument("--token")
parser.add_argument("--key")
parser.add_argument("--run-date", type=lambda s: datetime.strptime(s, '%Y-%m-%d'))
""" requirements.txt
certifi==2021.10.8
requests==2.26.0
# Indirect (pinned versions)
charset-normalizer==2.0.9
idna==3.3
urllib3==1.26.8
"""
""" Example Config file (SO_awards.json):
{
"exclusions": [
{ "description": "Larry", "id": 13 },
{ "description": "Curly", "id": 325 },
{ "description": "Moe", "id": 6 }
],
"nominations": {
"202110": [
{ "nominee": "Neo", "nominator": "Trinity" }
]
}
}
"""
CONFIG_URL = "http://some_location/data/SO_awards.json"
API_URL_BASE = "https://api.stackexchange.com/2.3"
API_PARAM_TEAM = "stackoverflow.com/c/fedins"
CACHE_FOLDER = ".cache"
def get_cache_file(name):
return os.path.join(CACHE_FOLDER, name)
def get_cached_updatetime(name):
return datetime.fromtimestamp(os.path.getmtime(get_cache_file(name)))
def get_with_cache(name, fn, kwargs={}, valid_for=timedelta(days=1), compare_to=datetime.now()):
if not os.path.isdir(CACHE_FOLDER):
os.mkdir(CACHE_FOLDER)
filename = get_cache_file(name)
if os.path.exists(filename) and get_cached_updatetime(name) + valid_for > compare_to:
print(f"using cached {name}")
with open(filename, "r") as f:
result = json.load(f)
else:
print(f"recreating {name}")
result = fn(**kwargs)
try:
with open(filename, "w") as f:
json.dump(result, f)
except TypeError:
os.remove(filename)
return result
class Caller:
run_date: datetime
def __init__(self, config):
self.token = config.token
self.key = config.key
if config.run_date:
self.run_date = config.run_date
else:
self.run_date = datetime.today()
def get_raw_data(self, endpoint, page = 1, pagesize = 30, params={}): # defaults from SO docs
params = {
"key": self.key,
"team": API_PARAM_TEAM,
"site": "stackoverflow",
"page": page,
"pagesize": pagesize,
**params
}
headers = {
"X-API-Access-Token": self.token
}
request = requests.Request(method="GET", params=params, headers=headers, url=f"{API_URL_BASE}{endpoint}")
preparedRequest = request.prepare()
print(preparedRequest.url)
session = requests.Session()
response = session.send(preparedRequest)
data = response.json()
return data
def get_all_data(self, endpoint, pagesize = 100, params={}): # maximum page size is defaulted here
page = 1
try:
data = self.get_raw_data(endpoint, page, pagesize, params)
for item in data["items"]:
yield item
while data["has_more"] == True:
page += 1
data = self.get_raw_data(endpoint, page, pagesize, params)
for item in data["items"]:
yield item
except Exception as e:
print(e)
print(json.dumps(data, indent=2))
# For more information, see the APIs: https://api.stackexchange.com/docs
# Also have a look at the teams-specific ones: https://api.stackexchange.com/docs/teams
def get_previous_quarter(self):
d = self.run_date
year = d.year
month = (d.month-1) - (d.month-1)%3 - 3
endyear = year
endmonth = month+3
if month < 0:
month += 12
year -= 1
return (datetime(year, month+1, 1), datetime(endyear, endmonth+1, 1))
def get_previous_month(self):
month = self.run_date.month-1
year = self.run_date.year
if month < 1:
month = 12
year -= 1
return (datetime(year, month, 1), datetime(self.run_date.year, self.run_date.month, 1))
def get_retrieval_timerange(self):
dates = self.get_previous_quarter() + self.get_previous_month()
return (min(dates), max(dates))
def get_user_list(self):
return list(self.get_all_data("/users", params={
"filter": "!0ZJMp6Z5IQ3kGdOOMEatVA*mw",
"order": "desc",
"sort": "reputation"
}))
def get_rep_change_for_time_range_and_ids(self, ids, start, end):
return list(self.get_all_data(f"/users/{ids}/reputation-history", params={
"fromdate": int(start.timestamp()),
"todate": int(end.timestamp())
}))
def run(self):
###
print(f"== Report information {str(self.run_date)}")
print(f"Previous month range: {[str(x) for x in self.get_previous_month()]}")
print(f"Previous quarter range: {[str(x) for x in self.get_previous_quarter()]}")
print(f"Range to retrieve: {[str(x) for x in self.get_retrieval_timerange()]}")
###
print("")
print(f"== Collecting data")
a_week = timedelta(days=7)
period = self.get_retrieval_timerange()
user_cache_time = get_cached_updatetime("users")
users = get_with_cache("users", self.get_user_list, valid_for=a_week)
id_groups = [";".join(items) for items in [
map(lambda x: str(x['user_id']), users[a:a+100]) for a in range(0, len(users), 100)
]]
rep_data = []
for (i,id_group) in enumerate(id_groups):
cache_key = f"reputation_{i}"
rep = get_with_cache(
cache_key,
self.get_rep_change_for_time_range_and_ids,
valid_for=timedelta(0),
compare_to=min(user_cache_time, get_cached_updatetime(cache_key) + a_week if os.path.exists(get_cache_file(cache_key)) else user_cache_time),
kwargs={
"ids": id_group,
"start": period[0],
"end": period[1]
}
)
rep_data += rep
range_stats = {
"quarterly": {
"range": self.get_previous_quarter(),
"data": {}
},
"monthly": {
"range": self.get_previous_month(),
"data": {}
}
}
for rep in rep_data:
d = datetime.fromtimestamp(rep['creation_date'])
for interval in range_stats.values():
if d > interval["range"][0] and d < interval["range"][1]:
user_id = rep['user_id']
if not user_id in interval["data"]:
interval["data"][user_id] = {'rep': 0, 'dates': [], 'id': user_id}
interval["data"][user_id]['rep'] += rep['reputation_change']
interval["data"][user_id]['dates'].append((rep['reputation_change'], rep['reputation_history_type'], str(d)))
id_mapping = {item['user_id']: item for item in users}
ratings = {
category: [
[id_mapping[id]['display_name'], detail] for (id,detail) in sorted(range_stats[category]["data"].items(), reverse=True, key=lambda x: x[1]['rep'])
]
for category in range_stats.keys()
}
with open(get_cache_file("aggregate"), "w") as f:
json.dump(ratings, f, indent=' ')
###
print("")
print("== Finding winners")
nomination_month_key = self.run_date.strftime("%Y%m")
config_data = requests.get(CONFIG_URL).json()
nomination_data = config_data["nominations"]
excluded_from_awards = list(map(lambda exclusion: exclusion["id"], config_data["exclusions"]))
print(f"These users are being excluded from awards {', '.join([x['description'] + ' (' + str(x['id']) + ')' for x in config_data['exclusions']])}")
###
print("")
print("== These are the winners!")
top_10_month = [f"\n {rating[0]} - {rating[1]['rep']}" for rating in [x for x in ratings["monthly"] if x[1]['id'] not in excluded_from_awards][:10]]
top_5_month = top_10_month[:5]
top_2_quarter = [f"\n {rating[0]} - {rating[1]['rep']}" for rating in [x for x in ratings["quarterly"] if x[1]['id'] not in excluded_from_awards][:2]]
print(f"Quarterly Top 2 (winner): {''.join(top_2_quarter)}")
print(f"Monthly Top 5 (winner): {''.join(top_5_month)}")
print(f"Monthly Top 10 for drawing: {''.join(top_10_month)}")
if nomination_month_key in nomination_data:
nominations = [f"\n {nomination['nominee']} (by {nomination['nominator']})" for nomination in nomination_data[nomination_month_key]]
print(f"Monthly drawing entries from nominations: {''.join(nominations)}")
else:
nominations = []
print("There are no additional drawing entries from nominations this month")
drawing_winners = random.sample(top_10_month + nominations, 4)
print(f"Monthly drawing winners: {''.join(drawing_winners)}")
if __name__ == "__main__":
caller = Caller(parser.parse_args())
caller.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment