Created
February 3, 2022 06:15
-
-
Save neelsomani/1e9dd03240278f754e3e79bd2739e3b2 to your computer and use it in GitHub Desktop.
Comparison of different ranking algorithms: https://neelsomani.com/blog/improving-elo-ranking-system.php
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import random | |
import math | |
import logging | |
N_PLAYERS = 5 | |
LOGGER = logging.getLogger('elo') | |
class Ranker: | |
def __init__(self): | |
pass | |
@staticmethod | |
def rank(games): | |
raise NotImplementedError | |
class Elo(Ranker): | |
K_FACTOR = 50 | |
START_SCORE = 1200 | |
@staticmethod | |
def _pr_win(rating_a, rating_b): | |
return 1 / (1 + 10 ** (rating_b - rating_a)) | |
@staticmethod | |
def rank(games): | |
initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)] | |
for ((i, j), o) in games: | |
rating_i, rating_j = initial_scores[i], initial_scores[j] | |
rating_i, rating_j = ( | |
rating_i + Elo.K_FACTOR * (o - Elo._pr_win(rating_i, rating_j)), | |
rating_j + Elo.K_FACTOR * ((1 - o) - Elo._pr_win(rating_j, rating_i)) | |
) | |
initial_scores[i], initial_scores[j] = rating_i, rating_j | |
return np.array(initial_scores) | |
class Glicko(Ranker): | |
RD = 350 | |
Q = .0057565 | |
G = 1 / (1 + 3 * .0057565 ** 2 * 350 ** 2 / 3.14 ** 2) ** .5 | |
@staticmethod | |
def _expected_score(r, r_j): | |
return 1 / (1 + 10 ** (-Glicko.G * (r - r_j) / 400)) | |
@staticmethod | |
def rank(games): | |
initial_scores = [Elo.START_SCORE for _ in range(N_PLAYERS)] | |
final_scores = [] | |
for i in range(N_PLAYERS): | |
# Calculate d_sq | |
d_sq = 0 | |
for j in range(N_PLAYERS): | |
for ((p_1, p_2), o) in games: | |
if p_1 == i and p_2 == j or p_1 == j and p_2 == i: | |
ex_s = Glicko._expected_score( | |
initial_scores[i], | |
initial_scores[j] | |
) | |
d_sq += Glicko.G**2 * ex_s * (1 - ex_s) | |
d_sq = (Glicko.Q**2 * d_sq)**-1 | |
# Compute the sum in the numerator for the score update | |
inner_sum = 0 | |
for j in range(N_PLAYERS): | |
for ((p_1, p_2), o) in games: | |
if p_1 == i and p_2 == j: | |
pass | |
elif p_1 == j and p_2 == i: | |
o = 1 - o | |
else: | |
continue | |
ex_score = Glicko._expected_score( | |
initial_scores[i], | |
initial_scores[j] | |
) | |
inner_sum += Glicko.G * (o - ex_score) | |
score_update = Glicko.Q / (1 / Glicko.RD**2 + 1 / d_sq) * inner_sum | |
final_scores.append(initial_scores[i] + score_update) | |
return final_scores | |
class SampleElo(Ranker): | |
N_RANDOM_SAMPLES = 100 | |
@staticmethod | |
def _permute(lst): | |
return [t[1] for t in sorted([(random.random(), v) for v in lst])] | |
@staticmethod | |
def rank(games): | |
return np.mean(np.array([ | |
Elo.rank(SampleElo._permute(games)) | |
for _ in range(SampleElo.N_RANDOM_SAMPLES) | |
]), axis=0) | |
class SysEqElo(Ranker): | |
@staticmethod | |
def _get_games_won(games, i, j): | |
return sum(a == i and b == j and o == 1 for ((a, b), o) in games) | |
@staticmethod | |
def _get_total_games(games, i, j): | |
return sum(a == i and b == j for ((a, b), o) in games) | |
@staticmethod | |
def _get_sys_eq(games): | |
X = [] | |
b = [] | |
for i in range(0, N_PLAYERS - 1): | |
for j in range(i + 1, N_PLAYERS): | |
total = SysEqElo._get_total_games(games, i, j) | |
if total == 0: | |
continue | |
i_won = SysEqElo._get_games_won(games, i, j) | |
j_won = total - i_won | |
# [log_10(j_won) - log_10(i_won)] + r_i / 400 - r_j / 400 = 0 | |
eq = np.zeros(N_PLAYERS) | |
eq[i] = 1 / 400 | |
eq[j] = -1 / 400 | |
X.append(eq) | |
# FIXME: A math domain error occurs here when a player wins no | |
# games | |
b.append(math.log10(i_won) - math.log10(j_won)) | |
return X, b | |
@staticmethod | |
def _append_constraint(X, b): | |
new_constraint = np.zeros(N_PLAYERS) | |
new_constraint[0] = 1 | |
# We only care about relative ranks, so we anchor to 1200 | |
return np.vstack([X, new_constraint]), np.append(b, 1200) | |
@staticmethod | |
def _solve_system(X, b): | |
return np.linalg.lstsq(X, b, rcond=None)[0] | |
@staticmethod | |
def rank(games): | |
X, b = SysEqElo._get_sys_eq(games) | |
X, b = SysEqElo._append_constraint(np.array(X), np.array(b)) | |
return SysEqElo._solve_system(X, b) | |
class PageRank(Ranker): | |
@staticmethod | |
def _create_win_loss_matrix(games): | |
outcomes = np.zeros((N_PLAYERS, N_PLAYERS)) | |
for p in range(N_PLAYERS): | |
for p_2 in range(N_PLAYERS): | |
games_p_lost_to_p_2 = sum( | |
p == i and p_2 == j and o == 0 | |
or p == j and p_2 == i and o == 1 | |
for ((i, j), o) in games | |
) | |
outcomes[p_2][p] = games_p_lost_to_p_2 | |
for p in range(N_PLAYERS): | |
if (outcomes[:, p] == 0).all(): | |
outcomes[:, p] = 1 | |
return outcomes / outcomes.sum(axis=0) | |
@staticmethod | |
def rank(games): | |
mtx = PageRank._create_win_loss_matrix(games) | |
eigenvalues, eigenvectors = np.linalg.eig(mtx) | |
for eig, vec in zip(eigenvalues, eigenvectors.T): | |
if abs(eig - 1) < 10**-5: | |
if not (vec >= 0).all() and not (vec <= 0).all(): | |
raise RuntimeError(f'Violation of Perron-Frobenius: {vec}') | |
return np.abs(vec) | |
raise RuntimeError('Did not find eigenvalue=1') | |
def get_ranking_algorithms(): | |
return { | |
'standard_elo': Elo, | |
# 'sample_elo': SampleElo, | |
'system_of_equations': SysEqElo, | |
'page_rank': PageRank, | |
'glicko': Glicko | |
} | |
def get_games(players_strength, n): | |
outcomes = [0, 1] | |
players = list(range(N_PLAYERS)) | |
games = [] | |
for _ in range(n): | |
i, j = np.random.choice(players, size=2, replace=False) | |
i, j = min(i, j), max(i, j) | |
denominator = players_strength[i] + players_strength[j] | |
# 0 if player j wins, 1 if player i wins | |
outcome = np.random.choice( | |
outcomes, | |
p=[ | |
players_strength[j] / denominator, | |
players_strength[i] / denominator | |
] | |
) | |
games.append(((i, j), outcome)) | |
return games | |
def get_rankings(lst): | |
return [i[0] for i in sorted(enumerate(lst), key=lambda x: x[1])] | |
def get_corr_coefs(): | |
players_strength = [int(random.random() * 1000) for _ in range(N_PLAYERS)] | |
actual_ranks = get_rankings(players_strength) | |
LOGGER.debug(f'Player strengths: {actual_ranks}') | |
games = get_games(players_strength, 500) | |
coefs = [] | |
for name, alg in get_ranking_algorithms().items(): | |
rankings = get_rankings(alg.rank(games)) | |
coefs.append(np.corrcoef(actual_ranks, rankings)[0, 1]) | |
LOGGER.debug(f'{name}: {coefs[-1]}') | |
return tuple(coefs) | |
def avg_corr_coefs(n=400): | |
# Compare all ranking algorithms | |
scores = {name: 0 for name in get_ranking_algorithms().keys()} | |
total_iterations = 0 | |
for i in range(n): | |
LOGGER.info(f'Iteration: {i}') | |
try: | |
for k, v in zip(scores, get_corr_coefs()): | |
scores[k] += v | |
except ValueError as ex: | |
LOGGER.info(ex) | |
total_iterations += 1 | |
return {k: v / total_iterations for k, v in scores.items()} | |
if __name__ == '__main__': | |
logging.basicConfig(level=logging.INFO) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment